This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 20dc8f9b089 [improve][broker] PIP-483: scalable topic auto split/merge 
(#25980)
20dc8f9b089 is described below

commit 20dc8f9b0892150ebc83032071628d16e5e294f1
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 11 19:34:45 2026 -0700

    [improve][broker] PIP-483: scalable topic auto split/merge (#25980)
---
 .../apache/pulsar/broker/ServiceConfiguration.java | 146 +++++++++
 .../broker/resources/ScalableTopicResources.java   |  58 +++-
 .../pulsar/broker/service/BrokerService.java       |  94 +++++-
 .../broker/service/scalable/AutoScaleConfig.java   | 146 +++++++++
 .../broker/service/scalable/AutoScaleDecision.java |  42 +++
 .../service/scalable/AutoScalePolicyEvaluator.java | 257 ++++++++++++++++
 .../service/scalable/ScalableTopicController.java  | 252 +++++++++++++++-
 .../broker/service/scalable/SegmentLayout.java     |  41 +++
 .../service/scalable/SegmentLoadReporter.java      | 141 +++++++++
 .../broker/service/scalable/SegmentLoadSample.java |  35 +++
 .../service/scalable/AutoScaleConfigTest.java      | 125 ++++++++
 .../scalable/AutoScalePolicyEvaluatorTest.java     | 335 +++++++++++++++++++++
 .../ScalableTopicControllerAutoScaleTest.java      | 286 ++++++++++++++++++
 .../scalable/ScalableTopicControllerTest.java      |   7 +
 .../service/scalable/ScalableTopicServiceTest.java |  11 +
 .../broker/service/scalable/SegmentLayoutTest.java |  33 ++
 .../service/scalable/SegmentLoadReporterTest.java  | 176 +++++++++++
 .../client/api/v5/V5SegmentLoadReporterTest.java   |  85 ++++++
 .../pulsar/common/scalable/SegmentLoadStats.java   |  46 +++
 19 files changed, 2310 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index ae4ec5777de..71ad014d7d8 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1366,6 +1366,152 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private int scalableTopicConsumerSessionGracePeriodSeconds = 60;
 
+    /**** --- Scalable topic auto split/merge (PIP-483). --- ****/
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_POLICIES,
+            doc = "Cluster-wide default for scalable-topic auto split/merge. 
When true, the controller "
+                    + "leader automatically splits hot segments and merges 
cold ones, within the caps "
+                    + "below. Can be overridden per-namespace and per-topic."
+    )
+    private boolean scalableTopicAutoScaleEnabled = true;
+
+    @FieldContext(
+            dynamic = false,
+            category = CATEGORY_POLICIES,
+            doc = "Cadence (seconds) of the controller's periodic 
traffic-driven auto split/merge "
+                    + "evaluation. Consumer-count changes are handled 
event-driven and are not affected "
+                    + "by this interval. Read when a controller wins 
leadership; not dynamic."
+    )
+    private int scalableTopicAutoScaleIntervalSeconds = 60;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_POLICIES,
+            doc = "Hard ceiling on the number of active segments a scalable 
topic can be auto-scaled to. "
+                    + "Splits stop firing once this is reached."
+    )
+    private int scalableTopicMaxSegments = 64;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_POLICIES,
+            doc = "Hard floor on the number of active segments. Merges stop 
firing once this is reached."
+    )
+    private int scalableTopicMinSegments = 1;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_POLICIES,
+            doc = "Max number of merges allowed in a segment's lineage. Once a 
segment reaches this depth "
+                    + "it stops being a merge candidate (load-driven splits 
are still allowed), bounding "
+                    + "split/merge flip-flopping."
+    )
+    private int scalableTopicMaxDagDepth = 10;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_POLICIES,
+            doc = "Minimum time (seconds) between automatic splits on a topic. 
Deliberately short — it "
+                    + "only coalesces a burst of near-simultaneous triggers 
(e.g. a consumer group "
+                    + "connecting at once)."
+    )
+    private int scalableTopicSplitCooldownSeconds = 60;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_POLICIES,
+            doc = "Minimum time (seconds) between automatic merges on a topic."
+    )
+    private int scalableTopicMergeCooldownSeconds = 300;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_POLICIES,
+            doc = "How long (seconds) a segment must continuously stay below 
every merge threshold before "
+                    + "it becomes merge-eligible."
+    )
+    private int scalableTopicMergeWindowSeconds = 300;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_POLICIES,
+            doc = "Inbound messages/second above which a segment is split."
+    )
+    private double scalableTopicSplitMsgRateInThreshold = 10_000;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_POLICIES,
+            doc = "Inbound bytes/second above which a segment is split."
+    )
+    private long scalableTopicSplitBytesRateInThreshold = 50_000_000L;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_POLICIES,
+            doc = "Outbound (dispatched) messages/second above which a segment 
is split."
+    )
+    private double scalableTopicSplitMsgRateOutThreshold = 50_000;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_POLICIES,
+            doc = "Outbound bytes/second above which a segment is split."
+    )
+    private long scalableTopicSplitBytesRateOutThreshold = 250_000_000L;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_POLICIES,
+            doc = "Inbound messages/second below which a segment counts as 
cold for merging."
+    )
+    private double scalableTopicMergeMsgRateInThreshold = 1_000;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_POLICIES,
+            doc = "Inbound bytes/second below which a segment counts as cold 
for merging."
+    )
+    private long scalableTopicMergeBytesRateInThreshold = 5_000_000L;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_POLICIES,
+            doc = "Outbound messages/second below which a segment counts as 
cold for merging."
+    )
+    private double scalableTopicMergeMsgRateOutThreshold = 5_000;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_POLICIES,
+            doc = "Outbound bytes/second below which a segment counts as cold 
for merging."
+    )
+    private long scalableTopicMergeBytesRateOutThreshold = 25_000_000L;
+
+    @FieldContext(
+            dynamic = false,
+            category = CATEGORY_POLICIES,
+            doc = "Interval (seconds) at which the segment-owning broker 
samples its segment topics to "
+                    + "report load for auto split/merge. Read at broker start; 
not dynamic."
+    )
+    private int scalableTopicLoadReportIntervalSeconds = 10;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_POLICIES,
+            doc = "Minimum relative change in any segment rate (e.g. 0.25 = 
25%) since the last write that "
+                    + "triggers a new load record. Keeps metadata write volume 
bounded; a steady-state "
+                    + "segment writes once and goes quiet.\n"
+                    + "Note: the band is anchored at the last written value, 
not at the split/merge "
+                    + "thresholds. A rate that settles within the band of the 
last record is never "
+                    + "re-reported, so a segment can sustain up to this factor 
beyond a split/merge "
+                    + "threshold without triggering — the cost of bounded 
write volume. Lower the "
+                    + "threshold for tighter tracking at the price of more 
metadata writes."
+    )
+    private double scalableTopicLoadReportRateChangeThreshold = 0.25;
+
     @FieldContext(
             dynamic = false,
             category = CATEGORY_POLICIES,
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
index 5647feef52a..8d5999c061b 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
@@ -32,9 +32,11 @@ import java.util.stream.Collectors;
 import lombok.CustomLog;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.SegmentLoadStats;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.CacheGetResult;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
@@ -64,6 +66,8 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
     private static final String SCALABLE_TOPIC_PATH = "/topics";
     private static final String SUBSCRIPTIONS_SEGMENT = "subscriptions";
     private static final String CONSUMERS_SEGMENT = "consumers";
+    private static final String SEGMENTS_SEGMENT = "segments";
+    private static final String LOAD_SEGMENT = "load";
 
     /**
      * Use the topic's {@code properties} map verbatim as the secondary-index 
entries.
@@ -77,6 +81,7 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
 
     private final MetadataCache<SubscriptionMetadata> subscriptionCache;
     private final MetadataCache<ConsumerRegistration> 
consumerRegistrationCache;
+    private final MetadataCache<SegmentLoadStats> segmentLoadCache;
 
     /**
      * Per-path listeners for scalable-topic metadata events. Each listener 
watches a
@@ -107,6 +112,7 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
         super(store, ScalableTopicMetadata.class, operationTimeoutSec);
         this.subscriptionCache = 
store.getMetadataCache(SubscriptionMetadata.class);
         this.consumerRegistrationCache = 
store.getMetadataCache(ConsumerRegistration.class);
+        this.segmentLoadCache = store.getMetadataCache(SegmentLoadStats.class);
         // Single shared metadata-store listener fans out to both per-path and
         // per-namespace subscribers. Per-subscriber lifecycle goes through the
         // register / deregister methods below.
@@ -260,7 +266,10 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
     }
 
     public CompletableFuture<Void> deleteScalableTopicAsync(TopicName tn) {
-        return deleteAsync(topicPath(tn));
+        // Recursive: the topic record has children — the controller leader 
lock, the
+        // subscriptions (and their consumer registrations), and the 
per-segment load
+        // records — all of which must go with the topic.
+        return getStore().deleteRecursive(topicPath(tn));
     }
 
     public CompletableFuture<Boolean> scalableTopicExistsAsync(TopicName tn) {
@@ -432,6 +441,53 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
     /**
      * Get the metadata store path for the controller leader lock.
      */
+    // --- Segment load records (PIP-483 auto split/merge) ---
+
+    /**
+     * Upsert a segment's load record. Written by the broker that owns the 
segment's
+     * {@code segment://} topic, only when the rates have changed materially 
since the last
+     * write (the materiality decision lives in {@code SegmentLoadReporter}).
+     *
+     * <p>An identical value is NOT rewritten: the record's {@code Stat} 
modification time is
+     * what the controller uses as "cold since" for the merge window, so a 
no-op rewrite —
+     * e.g. the first report after segment ownership moved to a broker with an 
empty
+     * last-written cache — would spuriously reset the window and starve 
merges under
+     * frequent rebalancing.
+     */
+    public CompletableFuture<Void> reportSegmentLoadAsync(TopicName tn, long 
segmentId,
+                                                          SegmentLoadStats 
stats) {
+        String path = segmentLoadPath(tn, segmentId);
+        return segmentLoadCache.get(path).thenCompose(existing -> {
+            if (existing.isPresent() && existing.get().equals(stats)) {
+                return CompletableFuture.completedFuture(null);
+            }
+            return segmentLoadCache.readModifyUpdateOrCreate(path, __ -> stats)
+                    .thenApply(__ -> null);
+        });
+    }
+
+    /**
+     * Read a segment's load record together with its metadata {@link Stat} — 
the controller's
+     * auto-scaling evaluator uses {@code stat.getModificationTimestamp()} to 
tell how long the
+     * segment has held its current load (the "cold for at least mergeWindow" 
check).
+     *
+     * @return the value and its stat, or empty if no record has been written 
yet
+     */
+    public CompletableFuture<Optional<CacheGetResult<SegmentLoadStats>>> 
getSegmentLoadAsync(
+            TopicName tn, long segmentId) {
+        return segmentLoadCache.getWithStats(segmentLoadPath(tn, segmentId));
+    }
+
+    /** Delete a segment's load record (best-effort; tolerates a missing 
record). */
+    public CompletableFuture<Void> deleteSegmentLoadAsync(TopicName tn, long 
segmentId) {
+        return segmentLoadCache.delete(segmentLoadPath(tn, segmentId))
+                .exceptionally(ignoreMissing());
+    }
+
+    public String segmentLoadPath(TopicName tn, long segmentId) {
+        return joinPath(topicPath(tn), SEGMENTS_SEGMENT, 
Long.toString(segmentId), LOAD_SEGMENT);
+    }
+
     public String controllerLockPath(TopicName tn) {
         return joinPath(topicPath(tn), "controller");
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index f702ffc1afe..fe4296710e1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -138,6 +138,7 @@ import 
org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryCla
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.persistent.SystemTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
+import org.apache.pulsar.broker.service.scalable.SegmentLoadReporter;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -186,6 +187,8 @@ import org.apache.pulsar.common.policies.data.TopicType;
 import 
org.apache.pulsar.common.policies.data.impl.AutoSubscriptionCreationOverrideImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.scalable.SegmentLoadStats;
+import org.apache.pulsar.common.scalable.SegmentTopicName;
 import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FieldParser;
@@ -286,6 +289,9 @@ public class BrokerService implements Closeable {
     private final SingleThreadNonConcurrentFixedRateScheduler 
compactionMonitor;
     private final SingleThreadNonConcurrentFixedRateScheduler 
consumedLedgersMonitor;
     private SingleThreadNonConcurrentFixedRateScheduler 
deduplicationSnapshotMonitor;
+    /** PIP-483: periodic sweep that writes per-segment load records for auto 
split/merge. */
+    private SingleThreadNonConcurrentFixedRateScheduler 
segmentLoadReporterMonitor;
+    private SegmentLoadReporter segmentLoadReporter;
     protected final PublishRateLimiter brokerPublishRateLimiter;
     private final DispatchRateLimiterFactory dispatchRateLimiterFactory;
     protected volatile DispatchRateLimiter brokerDispatchRateLimiter = null;
@@ -683,9 +689,69 @@ public class BrokerService implements Closeable {
         this.updateBrokerDispatchThrottlingMaxRate();
         this.startCheckReplicationPolicies();
         this.startDeduplicationSnapshotMonitor();
+        this.startSegmentLoadReporter();
         this.startClearInvalidateTopicNameCacheTask();
     }
 
+    /**
+     * Start the periodic per-segment load reporter (PIP-483). On each tick 
this broker sweeps
+     * the {@code segment://} topics it currently hosts, computes their 
ingest/dispatch rates,
+     * and writes a {@link SegmentLoadStats} record to the metadata store — 
but only when a rate
+     * changed materially since the last write (see {@link 
SegmentLoadReporter}). The controller
+     * leader reads these records to drive auto split/merge.
+     */
+    protected void startSegmentLoadReporter() {
+        ServiceConfiguration conf = pulsar().getConfiguration();
+        if (!conf.isScalableTopicsEnabled()) {
+            return;
+        }
+        var resources = 
pulsar().getPulsarResources().getScalableTopicResources();
+        if (resources == null) {
+            return;
+        }
+        int interval = conf.getScalableTopicLoadReportIntervalSeconds();
+        if (interval <= 0) {
+            return;
+        }
+        this.segmentLoadReporter = new SegmentLoadReporter(resources,
+                () -> 
pulsar().getConfiguration().getScalableTopicLoadReportRateChangeThreshold());
+        this.segmentLoadReporterMonitor =
+                new 
SingleThreadNonConcurrentFixedRateScheduler("scalable-segment-load-reporter");
+        segmentLoadReporterMonitor.scheduleAtFixedRateNonConcurrently(
+                () -> forEachTopic(this::reportSegmentLoad), interval, 
interval, TimeUnit.SECONDS);
+    }
+
+    @VisibleForTesting
+    public void runSegmentLoadReportOnceForTest() {
+        forEachTopic(this::reportSegmentLoad);
+    }
+
+    private void reportSegmentLoad(Topic topic) {
+        SegmentLoadReporter reporter = this.segmentLoadReporter;
+        if (reporter == null) {
+            return;
+        }
+        TopicName topicName = TopicName.get(topic.getName());
+        if (topicName.getDomain() != TopicDomain.segment) {
+            return;
+        }
+        try {
+            TopicName parent = SegmentTopicName.getParentTopicName(topicName);
+            long segmentId = SegmentTopicName.getSegmentId(topicName);
+            var stats = topic.getStats(false, false, false);
+            SegmentLoadStats load = new SegmentLoadStats(
+                    stats.msgRateIn, stats.msgThroughputIn, stats.msgRateOut, 
stats.msgThroughputOut);
+            reporter.reportIfChanged(parent, segmentId, load).exceptionally(ex 
-> {
+                log.debug().attr("segment", topicName).exceptionMessage(ex)
+                        .log("Failed to report segment load");
+                return null;
+            });
+        } catch (Exception e) {
+            log.debug().attr("segment", topicName).exceptionMessage(e)
+                    .log("Failed to sample segment load");
+        }
+    }
+
     protected void startClearInvalidateTopicNameCacheTask() {
         final int maxSecondsToClearTopicNameCache = 
pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache();
         inactivityMonitor.scheduleAtFixedRateNonConcurrently(
@@ -967,7 +1033,8 @@ public class BrokerService implements Closeable {
                                                 consumedLedgersMonitor,
                                                 backlogQuotaChecker,
                                                 topicOrderedExecutor,
-                                                deduplicationSnapshotMonitor)
+                                                deduplicationSnapshotMonitor,
+                                                segmentLoadReporterMonitor)
                                         .handle());
 
                                 CompletableFuture<Void> combined =
@@ -2707,9 +2774,34 @@ public class BrokerService implements Closeable {
         if (compactor != null) {
             compactor.getStats().removeTopic(topic);
         }
+        forgetSegmentLoad(topic);
         topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, 
EventStage.SUCCESS);
     }
 
+    /**
+     * Drop the load reporter's last-written cache entry for a segment topic 
this broker no
+     * longer owns (unload / delete). Without this the cache grows unboundedly 
with segment
+     * churn, and on a later re-acquire the first sample could be wrongly 
suppressed as
+     * immaterial.
+     */
+    private void forgetSegmentLoad(String topic) {
+        SegmentLoadReporter reporter = this.segmentLoadReporter;
+        if (reporter == null) {
+            return;
+        }
+        TopicName topicName = TopicName.get(topic);
+        if (topicName.getDomain() != TopicDomain.segment) {
+            return;
+        }
+        try {
+            reporter.forget(SegmentTopicName.getParentTopicName(topicName),
+                    SegmentTopicName.getSegmentId(topicName));
+        } catch (Exception e) {
+            log.debug().attr("segment", topicName).exceptionMessage(e)
+                    .log("Failed to forget segment load cache entry");
+        }
+    }
+
     public long getNumberOfNamespaceBundles() {
         this.numberOfNamespaceBundles = 0;
         this.multiLayerTopicsMap.forEach((namespaceName, bundles) -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java
new file mode 100644
index 00000000000..b739b102a76
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.time.Duration;
+import lombok.Builder;
+import org.apache.pulsar.broker.ServiceConfiguration;
+
+/**
+ * Fully-resolved auto split/merge policy for a single scalable topic 
(PIP-483).
+ *
+ * <p>This is the flattened result of merging broker config defaults with any 
namespace and
+ * topic overrides. The {@link AutoScalePolicyEvaluator} reads it directly — 
it never sees
+ * the partial override objects or the broker config.
+ *
+ * <p>All thresholds are absolute (msg/s and bytes/s). Split thresholds must 
sit strictly
+ * above the corresponding merge thresholds: the dead-band between them is the 
hysteresis
+ * that prevents a just-merged segment from immediately re-qualifying for a 
split.
+ *
+ * @param enabled           whether auto split/merge is active for this topic; 
when false the
+ *                          evaluator always returns {@code NoAction}
+ * @param maxSegments       hard ceiling on active segments; splits stop once 
reached
+ * @param minSegments       hard floor on active segments; merges stop once 
reached
+ * @param maxDagDepth       max merges allowed in a segment's lineage; a pair 
is merge-eligible
+ *                          only while neither side has reached this depth 
(splits are unaffected)
+ * @param splitCooldown     minimum time between automatic splits on the 
topic; short, only to
+ *                          coalesce a burst of near-simultaneous triggers
+ * @param mergeCooldown     minimum time between automatic merges on the topic
+ * @param mergeWindow       how long a segment must continuously stay below 
every merge threshold
+ *                          before it becomes merge-eligible (measured from 
the load record's
+ *                          metadata-store last-modified time)
+ * @param splitMsgRateIn    inbound msg/s above which a segment is split
+ * @param splitBytesRateIn  inbound bytes/s above which a segment is split
+ * @param splitMsgRateOut   outbound (dispatched) msg/s above which a segment 
is split
+ * @param splitBytesRateOut outbound bytes/s above which a segment is split
+ * @param mergeMsgRateIn    inbound msg/s below which a segment counts as cold 
for merging
+ * @param mergeBytesRateIn  inbound bytes/s below which a segment counts as 
cold for merging
+ * @param mergeMsgRateOut   outbound msg/s below which a segment counts as 
cold for merging
+ * @param mergeBytesRateOut outbound bytes/s below which a segment counts as 
cold for merging
+ */
+@Builder(toBuilder = true)
+public record AutoScaleConfig(
+        boolean enabled,
+        int maxSegments,
+        int minSegments,
+        int maxDagDepth,
+        Duration splitCooldown,
+        Duration mergeCooldown,
+        Duration mergeWindow,
+        double splitMsgRateIn,
+        double splitBytesRateIn,
+        double splitMsgRateOut,
+        double splitBytesRateOut,
+        double mergeMsgRateIn,
+        double mergeBytesRateIn,
+        double mergeMsgRateOut,
+        double mergeBytesRateOut
+) {
+
+    /**
+     * Build the cluster-wide default policy from broker configuration. 
Per-namespace and
+     * per-topic overrides (when added) are layered on top of this via {@code 
toBuilder()}.
+     *
+     * @param conf the broker service configuration
+     * @return the resolved policy reflecting the {@code scalableTopic*} 
settings
+     */
+    public static AutoScaleConfig fromBrokerConfig(ServiceConfiguration conf) {
+        return AutoScaleConfig.builder()
+                .enabled(conf.isScalableTopicAutoScaleEnabled())
+                .maxSegments(conf.getScalableTopicMaxSegments())
+                .minSegments(conf.getScalableTopicMinSegments())
+                .maxDagDepth(conf.getScalableTopicMaxDagDepth())
+                
.splitCooldown(Duration.ofSeconds(conf.getScalableTopicSplitCooldownSeconds()))
+                
.mergeCooldown(Duration.ofSeconds(conf.getScalableTopicMergeCooldownSeconds()))
+                
.mergeWindow(Duration.ofSeconds(conf.getScalableTopicMergeWindowSeconds()))
+                .splitMsgRateIn(conf.getScalableTopicSplitMsgRateInThreshold())
+                
.splitBytesRateIn(conf.getScalableTopicSplitBytesRateInThreshold())
+                
.splitMsgRateOut(conf.getScalableTopicSplitMsgRateOutThreshold())
+                
.splitBytesRateOut(conf.getScalableTopicSplitBytesRateOutThreshold())
+                .mergeMsgRateIn(conf.getScalableTopicMergeMsgRateInThreshold())
+                
.mergeBytesRateIn(conf.getScalableTopicMergeBytesRateInThreshold())
+                
.mergeMsgRateOut(conf.getScalableTopicMergeMsgRateOutThreshold())
+                
.mergeBytesRateOut(conf.getScalableTopicMergeBytesRateOutThreshold())
+                .build()
+                .validated();
+    }
+
+    /**
+     * Validate the invariants the evaluator depends on; returns {@code this} 
for chaining.
+     *
+     * <p>In particular every split threshold must be strictly positive — the 
evaluator
+     * scores overload as {@code rate / splitThreshold}, and a zero threshold 
would make any
+     * positive rate score {@code Infinity} (permanent split pressure) while a 
zero rate
+     * scores {@code NaN} (silently ignored). Catching misconfiguration here 
surfaces a
+     * clear error at the policy-resolution layer instead.
+     *
+     * @throws IllegalArgumentException if any invariant is violated
+     */
+    public AutoScaleConfig validated() {
+        check(minSegments >= 1, "minSegments must be >= 1");
+        check(maxSegments >= minSegments, "maxSegments must be >= 
minSegments");
+        check(maxDagDepth >= 0, "maxDagDepth must be >= 0");
+        check(!splitCooldown.isNegative(), "splitCooldown must not be 
negative");
+        check(!mergeCooldown.isNegative(), "mergeCooldown must not be 
negative");
+        check(!mergeWindow.isNegative(), "mergeWindow must not be negative");
+        check(splitMsgRateIn > 0, "splitMsgRateInThreshold must be > 0");
+        check(splitBytesRateIn > 0, "splitBytesRateInThreshold must be > 0");
+        check(splitMsgRateOut > 0, "splitMsgRateOutThreshold must be > 0");
+        check(splitBytesRateOut > 0, "splitBytesRateOutThreshold must be > 0");
+        check(mergeMsgRateIn >= 0, "mergeMsgRateInThreshold must be >= 0");
+        check(mergeBytesRateIn >= 0, "mergeBytesRateInThreshold must be >= 0");
+        check(mergeMsgRateOut >= 0, "mergeMsgRateOutThreshold must be >= 0");
+        check(mergeBytesRateOut >= 0, "mergeBytesRateOutThreshold must be >= 
0");
+        check(splitMsgRateIn > mergeMsgRateIn,
+                "splitMsgRateInThreshold must be > mergeMsgRateInThreshold 
(hysteresis)");
+        check(splitBytesRateIn > mergeBytesRateIn,
+                "splitBytesRateInThreshold must be > mergeBytesRateInThreshold 
(hysteresis)");
+        check(splitMsgRateOut > mergeMsgRateOut,
+                "splitMsgRateOutThreshold must be > mergeMsgRateOutThreshold 
(hysteresis)");
+        check(splitBytesRateOut > mergeBytesRateOut,
+                "splitBytesRateOutThreshold must be > 
mergeBytesRateOutThreshold (hysteresis)");
+        return this;
+    }
+
+    private static void check(boolean condition, String message) {
+        if (!condition) {
+            throw new IllegalArgumentException("Invalid auto split/merge 
configuration: " + message);
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleDecision.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleDecision.java
new file mode 100644
index 00000000000..e8a1713e1f0
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleDecision.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+/**
+ * The outcome of one {@link AutoScalePolicyEvaluator} evaluation (PIP-483): 
split one
+ * segment, merge two adjacent segments, or do nothing. Each non-{@link 
NoAction} variant
+ * carries a short {@code reason} string used for logging and metrics.
+ */
+public sealed interface AutoScaleDecision
+        permits AutoScaleDecision.Split, AutoScaleDecision.Merge, 
AutoScaleDecision.NoAction {
+
+    /** Split {@code segmentId} at its midpoint. */
+    record Split(long segmentId, String reason) implements AutoScaleDecision {
+    }
+
+    /** Merge the two adjacent active segments {@code segmentId1} and {@code 
segmentId2}. */
+    record Merge(long segmentId1, long segmentId2, String reason) implements 
AutoScaleDecision {
+    }
+
+    /** No action this evaluation. */
+    record NoAction() implements AutoScaleDecision {
+    }
+
+    NoAction NONE = new NoAction();
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluator.java
new file mode 100644
index 00000000000..bcb48ffd6a9
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluator.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.common.scalable.SegmentLoadStats;
+
+/**
+ * Pure, side-effect-free decision function for scalable-topic auto 
split/merge (PIP-483).
+ *
+ * <p>Given a snapshot of the current layout, per-segment load samples, 
per-subscription
+ * stream/checkpoint consumer counts, the resolved policy, and the current 
time, it returns
+ * exactly one {@link AutoScaleDecision}. It performs no I/O and holds no 
state — the caller
+ * (the controller leader) collects the inputs and dispatches the result.
+ *
+ * <p>It runs two passes and emits at most one action:
+ * <ol>
+ *   <li><b>Split</b> (fast, lightly coalesced by {@code splitCooldown}): 
consumer-count
+ *       scale-up first, then traffic-driven scale-up.</li>
+ *   <li><b>Merge</b> (lazy, gated by {@code mergeCooldown} + {@code 
mergeWindow} +
+ *       {@code maxDagDepth}): only if no split fired.</li>
+ * </ol>
+ */
+public final class AutoScalePolicyEvaluator {
+
+    private AutoScalePolicyEvaluator() {
+    }
+
+    /**
+     * Decide whether to split, merge, or do nothing.
+     *
+     * @param layout              the current segment layout
+     * @param loadBySegment       per active-segment load sample; a missing 
entry is treated
+     *                            as zero load with no age (never 
merge-eligible)
+     * @param streamConsumerCount per-subscription count of STREAM/CHECKPOINT 
(controller-managed)
+     *                            consumers; QUEUE subscriptions are excluded 
by the caller
+     * @param config              the resolved policy
+     * @param nowMs               current wall-clock time, epoch millis
+     * @param lastSplitAtMs       epoch millis of the last split on this topic 
(manual or auto),
+     *                            or {@code Long.MIN_VALUE} if none
+     * @param lastMergeAtMs       epoch millis of the last merge on this topic 
(manual or auto),
+     *                            or {@code Long.MIN_VALUE} if none
+     * @return the decision
+     */
+    public static AutoScaleDecision decide(
+            SegmentLayout layout,
+            Map<Long, SegmentLoadSample> loadBySegment,
+            Map<String, Integer> streamConsumerCount,
+            AutoScaleConfig config,
+            long nowMs,
+            long lastSplitAtMs,
+            long lastMergeAtMs) {
+
+        if (!config.enabled()) {
+            return AutoScaleDecision.NONE;
+        }
+
+        List<SegmentInfo> active = new 
ArrayList<>(layout.getActiveSegments().values());
+
+        AutoScaleDecision split = trySplit(active, loadBySegment, 
streamConsumerCount,
+                config, nowMs, lastSplitAtMs);
+        if (!(split instanceof AutoScaleDecision.NoAction)) {
+            return split;
+        }
+
+        return tryMerge(active, layout, loadBySegment, config, nowMs, 
lastMergeAtMs);
+    }
+
+    // --- Split pass ---
+
+    private static AutoScaleDecision trySplit(
+            List<SegmentInfo> active,
+            Map<Long, SegmentLoadSample> loadBySegment,
+            Map<String, Integer> streamConsumerCount,
+            AutoScaleConfig config,
+            long nowMs,
+            long lastSplitAtMs) {
+
+        if (active.size() >= config.maxSegments()) {
+            return AutoScaleDecision.NONE;
+        }
+        if (withinCooldown(nowMs, lastSplitAtMs, 
config.splitCooldown().toMillis())) {
+            return AutoScaleDecision.NONE;
+        }
+
+        // (a) Consumer-driven: per-subscription max. If any managed 
subscription has more
+        // consumers than there are active segments, add a segment so the 1:1 
assignment can
+        // give the extra consumer its own segment. Split the busiest segment 
by msgRateIn so
+        // the new pair lands where it relieves the most ingest.
+        int requiredConsumers = streamConsumerCount.values().stream()
+                .mapToInt(Integer::intValue).max().orElse(0);
+        if (requiredConsumers > active.size()) {
+            SegmentInfo target = busiestByMsgRateIn(active, loadBySegment);
+            if (target != null) {
+                return new AutoScaleDecision.Split(target.segmentId(), 
"consumer-count");
+            }
+        }
+
+        // (b) Load-driven: split the segment with the highest overload score 
among those over
+        // at least one split threshold.
+        SegmentInfo hottest = null;
+        double hottestScore = 1.0; // strictly over threshold means a 
per-metric ratio > 1.0
+        String hottestReason = null;
+        for (SegmentInfo segment : active) {
+            SegmentLoadStats stats = statsOf(segment.segmentId(), 
loadBySegment);
+            double score = 0.0;
+            String reason = null;
+            double[] ratios = {
+                    stats.msgRateIn() / config.splitMsgRateIn(),
+                    stats.bytesRateIn() / config.splitBytesRateIn(),
+                    stats.msgRateOut() / config.splitMsgRateOut(),
+                    stats.bytesRateOut() / config.splitBytesRateOut(),
+            };
+            String[] reasons = {"msgRateIn", "bytesRateIn", "msgRateOut", 
"bytesRateOut"};
+            for (int i = 0; i < ratios.length; i++) {
+                if (ratios[i] > score) {
+                    score = ratios[i];
+                    reason = reasons[i];
+                }
+            }
+            if (score > 1.0 && score > hottestScore) {
+                hottestScore = score;
+                hottest = segment;
+                hottestReason = reason;
+            }
+        }
+        if (hottest != null) {
+            return new AutoScaleDecision.Split(hottest.segmentId(), 
hottestReason);
+        }
+
+        return AutoScaleDecision.NONE;
+    }
+
+    // --- Merge pass ---
+
+    private static AutoScaleDecision tryMerge(
+            List<SegmentInfo> active,
+            SegmentLayout layout,
+            Map<Long, SegmentLoadSample> loadBySegment,
+            AutoScaleConfig config,
+            long nowMs,
+            long lastMergeAtMs) {
+
+        if (active.size() <= config.minSegments()) {
+            return AutoScaleDecision.NONE;
+        }
+        if (withinCooldown(nowMs, lastMergeAtMs, 
config.mergeCooldown().toMillis())) {
+            return AutoScaleDecision.NONE;
+        }
+
+        long mergeWindowMs = config.mergeWindow().toMillis();
+
+        AutoScaleDecision.Merge coldest = null;
+        double coldestCombined = Double.MAX_VALUE;
+        for (int i = 0; i < active.size(); i++) {
+            for (int j = i + 1; j < active.size(); j++) {
+                SegmentInfo a = active.get(i);
+                SegmentInfo b = active.get(j);
+                if (!a.hashRange().isAdjacentTo(b.hashRange())) {
+                    continue;
+                }
+                if (layout.mergeDepth(a.segmentId()) >= config.maxDagDepth()
+                        || layout.mergeDepth(b.segmentId()) >= 
config.maxDagDepth()) {
+                    continue;
+                }
+                if (!coldEnough(a.segmentId(), loadBySegment, config, nowMs, 
mergeWindowMs)
+                        || !coldEnough(b.segmentId(), loadBySegment, config, 
nowMs, mergeWindowMs)) {
+                    continue;
+                }
+                double combined = combinedRate(a.segmentId(), loadBySegment)
+                        + combinedRate(b.segmentId(), loadBySegment);
+                if (combined < coldestCombined) {
+                    coldestCombined = combined;
+                    coldest = new AutoScaleDecision.Merge(a.segmentId(), 
b.segmentId(), "cold");
+                }
+            }
+        }
+        return coldest != null ? coldest : AutoScaleDecision.NONE;
+    }
+
+    /**
+     * A segment is cold enough to merge only if it has a load record that has 
stayed below
+     * every merge threshold for at least {@code mergeWindowMs}. A missing 
record means we
+     * have no evidence the segment is durably cold, so it is never 
merge-eligible.
+     *
+     * <p>Note that {@code nowMs} is the controller broker's clock while the 
sample's
+     * {@code modifiedAtMs} is the metadata store's server-side timestamp; 
clock skew between
+     * the two shifts the effective window. Acceptable for a lazy-merge 
heuristic — skew is
+     * normally seconds against a multi-minute window.
+     */
+    private static boolean coldEnough(long segmentId, Map<Long, 
SegmentLoadSample> loadBySegment,
+                                      AutoScaleConfig config, long nowMs, long 
mergeWindowMs) {
+        SegmentLoadSample sample = loadBySegment.get(segmentId);
+        if (sample == null) {
+            return false;
+        }
+        if (nowMs - sample.modifiedAtMs() < mergeWindowMs) {
+            return false;
+        }
+        SegmentLoadStats stats = sample.stats();
+        return stats.msgRateIn() < config.mergeMsgRateIn()
+                && stats.bytesRateIn() < config.mergeBytesRateIn()
+                && stats.msgRateOut() < config.mergeMsgRateOut()
+                && stats.bytesRateOut() < config.mergeBytesRateOut();
+    }
+
+    // --- Helpers ---
+
+    private static boolean withinCooldown(long nowMs, long lastAtMs, long 
cooldownMs) {
+        return lastAtMs != Long.MIN_VALUE && nowMs - lastAtMs < cooldownMs;
+    }
+
+    private static SegmentLoadStats statsOf(long segmentId, Map<Long, 
SegmentLoadSample> load) {
+        SegmentLoadSample sample = load.get(segmentId);
+        return sample != null ? sample.stats() : SegmentLoadStats.ZERO;
+    }
+
+    private static double combinedRate(long segmentId, Map<Long, 
SegmentLoadSample> load) {
+        SegmentLoadStats s = statsOf(segmentId, load);
+        return s.msgRateIn() + s.bytesRateIn() + s.msgRateOut() + 
s.bytesRateOut();
+    }
+
+    private static SegmentInfo busiestByMsgRateIn(List<SegmentInfo> active,
+                                                  Map<Long, SegmentLoadSample> 
load) {
+        SegmentInfo best = null;
+        double bestRate = -1.0;
+        for (SegmentInfo segment : active) {
+            double rate = statsOf(segment.segmentId(), load).msgRateIn();
+            // Tie-break deterministically on segment id so the choice is 
stable across ticks.
+            if (rate > bestRate || (rate == bestRate && best != null
+                    && segment.segmentId() < best.segmentId())) {
+                bestRate = rate;
+                best = segment;
+            }
+        }
+        return best;
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index c5501db56e2..0b8c70d66b1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.scalable;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.github.merlimat.slog.Logger;
 import java.time.Clock;
 import java.time.Duration;
@@ -31,8 +32,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Getter;
 import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
 import org.apache.pulsar.broker.resources.ScalableTopicResources;
 import org.apache.pulsar.broker.service.BrokerService;
@@ -84,6 +87,27 @@ public class ScalableTopicController {
     /** Sealed-segment GC scheduled task. Non-null only while this broker is 
leader. */
     private volatile ScheduledFuture<?> gcTask;
 
+    /** Periodic auto split/merge evaluation task (PIP-483). Non-null only 
while leader. */
+    private volatile ScheduledFuture<?> autoScaleTask;
+
+    /**
+     * Serializes auto split/merge: an evaluation acquires this before 
deciding and holds it
+     * for the whole split/merge it dispatches, so concurrent ticks / 
consumer-change triggers
+     * never launch overlapping auto operations.
+     */
+    private final AtomicBoolean autoScaleInFlight = new AtomicBoolean(false);
+
+    /**
+     * Set when a trigger arrives while an evaluation is in flight; the 
in-flight run
+     * re-evaluates once on completion so coalesced triggers are not lost 
until the next tick.
+     */
+    private final AtomicBoolean autoScaleReEvaluate = new AtomicBoolean(false);
+
+    /** Epoch millis of the last split on this topic (manual or auto); 
MIN_VALUE if none. */
+    private volatile long lastSplitAtMs = Long.MIN_VALUE;
+    /** Epoch millis of the last merge on this topic (manual or auto); 
MIN_VALUE if none. */
+    private volatile long lastMergeAtMs = Long.MIN_VALUE;
+
     @Getter
     private volatile LeaderElectionState leaderState = 
LeaderElectionState.NoLeader;
 
@@ -126,10 +150,11 @@ public class ScalableTopicController {
     private void onLeaderStateChange(LeaderElectionState state) {
         log.info().attr("state", state).log("Leader state change for scalable 
topic");
         if (state != LeaderElectionState.Leading) {
-            // Stepped down (or never was leader). Stop the GC tick so the 
deposed leader
-            // doesn't race the new one on layout writes / backing-topic 
deletes. The new
-            // leader's initialize() will reschedule.
+            // Stepped down (or never was leader). Stop the GC and auto-scale 
ticks so the
+            // deposed leader doesn't race the new one on layout writes / 
backing-topic
+            // deletes. The new leader's initialize() will reschedule.
             cancelGcTask();
+            cancelAutoScaleTask();
         }
         if (state == LeaderElectionState.NoLeader && !closed) {
             initialize().exceptionally(ex -> {
@@ -160,7 +185,9 @@ public class ScalableTopicController {
                 })
                 .thenCompose(__ -> {
                     if (isLeader()) {
+                        seedAutoScaleCooldownsFromLayout();
                         scheduleGcTask();
+                        scheduleAutoScaleTask();
                         return ensureActiveSegmentsExist()
                                 .thenCompose(___ -> 
restoreSessionsFromStore());
                     }
@@ -168,6 +195,29 @@ public class ScalableTopicController {
                 });
     }
 
+    /**
+     * Recover the auto split/merge cooldown clocks after winning leadership. 
The timestamps
+     * are in-memory only, but the layout itself records when each segment was 
created — a
+     * split's children have exactly one parent, a merge's child has two — so 
the most recent
+     * creation time of each class is exactly when the last split / merge 
happened. Without
+     * this, every leader failover would reset both cooldowns and e.g. allow 
an auto merge
+     * seconds after one just ran on the previous leader.
+     */
+    private void seedAutoScaleCooldownsFromLayout() {
+        long split = Long.MIN_VALUE;
+        long merge = Long.MIN_VALUE;
+        for (SegmentInfo segment : currentLayout.getAllSegments().values()) {
+            int parents = segment.parentIds().size();
+            if (parents == 1) {
+                split = Math.max(split, segment.createdAtMs());
+            } else if (parents >= 2) {
+                merge = Math.max(merge, segment.createdAtMs());
+            }
+        }
+        lastSplitAtMs = split;
+        lastMergeAtMs = merge;
+    }
+
     /**
      * Recovery path for active segments whose backing topics are missing — 
e.g.,
      * a {@code createScalableTopic} call that committed metadata but failed to
@@ -237,6 +287,186 @@ public class ScalableTopicController {
         }
     }
 
+    // --- Auto split/merge (PIP-483) ---
+
+    /**
+     * Schedule the periodic traffic-driven auto split/merge evaluation. Only 
fires on the
+     * controller leader; idempotent. Cancelled on close / leader-loss. 
Consumer-count
+     * changes are handled event-driven (see {@link 
#onConsumerCountChanged()}), not by this
+     * tick.
+     *
+     * <p>The tick is scheduled even when auto-scaling is currently disabled: 
the enabled
+     * flag is dynamic and re-checked on every evaluation, so flipping it on 
takes effect at
+     * the next tick rather than waiting for a leadership cycle. A disabled 
tick is a cheap
+     * no-op.
+     */
+    private synchronized void scheduleAutoScaleTask() {
+        if (closed || autoScaleTask != null) {
+            return;
+        }
+        ServiceConfiguration config = brokerConfig();
+        if (config == null) {
+            return;
+        }
+        long intervalMs = Duration.ofSeconds(
+                config.getScalableTopicAutoScaleIntervalSeconds()).toMillis();
+        if (intervalMs <= 0) {
+            return;
+        }
+        autoScaleTask = scheduler().scheduleAtFixedRate(
+                () -> runAutoScaleSafely("tick"), intervalMs, intervalMs, 
TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void cancelAutoScaleTask() {
+        if (autoScaleTask != null) {
+            autoScaleTask.cancel(false);
+            autoScaleTask = null;
+        }
+    }
+
+    /**
+     * Event-driven trigger: a stream/checkpoint consumer registered or 
unregistered, which
+     * may change the per-subscription consumer count. Evaluates the 
consumer-count split rule
+     * within seconds rather than waiting for the periodic tick.
+     */
+    private void onConsumerCountChanged() {
+        runAutoScaleSafely("consumer-change");
+    }
+
+    private void runAutoScaleSafely(String trigger) {
+        if (!isLeader() || closed) {
+            return;
+        }
+        try {
+            evaluateAndAct(trigger).exceptionally(ex -> {
+                log.warn().attr("trigger", trigger).exceptionMessage(ex)
+                        .log("Auto split/merge evaluation failed");
+                return null;
+            });
+        } catch (Throwable t) {
+            log.warn().attr("trigger", trigger).exception(t)
+                    .log("Auto split/merge evaluation threw");
+        }
+    }
+
+    /**
+     * Collect the current inputs, run the pure {@link 
AutoScalePolicyEvaluator}, and dispatch
+     * the resulting action. At most one auto operation runs at a time: {@link 
#autoScaleInFlight}
+     * is held from before the decision through the end of the dispatched 
split/merge.
+     */
+    private CompletableFuture<Void> evaluateAndAct(String trigger) {
+        ServiceConfiguration brokerConfig = brokerConfig();
+        if (brokerConfig == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        AutoScaleConfig config = 
AutoScaleConfig.fromBrokerConfig(brokerConfig);
+        if (!config.enabled()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        if (!autoScaleInFlight.compareAndSet(false, true)) {
+            // Another evaluation or auto operation is already running. Don't 
drop the
+            // trigger: mark it pending so the in-flight run re-evaluates on 
completion —
+            // e.g. a consumer registering mid-evaluation would otherwise not 
be considered
+            // until the next periodic tick.
+            autoScaleReEvaluate.set(true);
+            return CompletableFuture.completedFuture(null);
+        }
+        try {
+            return collectConsumerCounts()
+                    .thenCombine(collectLoadSamples(), (consumers, load) ->
+                            AutoScalePolicyEvaluator.decide(currentLayout, 
load, consumers, config,
+                                    clock.millis(), lastSplitAtMs, 
lastMergeAtMs))
+                    .thenCompose(decision -> dispatch(decision, config, 
trigger))
+                    .whenComplete((__, ex) -> {
+                        autoScaleInFlight.set(false);
+                        if (autoScaleReEvaluate.getAndSet(false)) {
+                            // Re-run off the completion thread for the 
trigger(s) coalesced
+                            // while this evaluation was in flight.
+                            scheduler().execute(() -> 
runAutoScaleSafely("coalesced"));
+                        }
+                    });
+        } catch (Throwable t) {
+            // A synchronous throw between the CAS and the future chain would 
otherwise leave
+            // the in-flight flag set forever, silently disabling auto-scaling 
on this topic.
+            autoScaleInFlight.set(false);
+            throw t;
+        }
+    }
+
+    private CompletableFuture<Void> dispatch(AutoScaleDecision decision, 
AutoScaleConfig config,
+                                             String trigger) {
+        if (decision instanceof AutoScaleDecision.Split split) {
+            log.info().attr("segmentId", split.segmentId()).attr("reason", 
split.reason())
+                    .attr("trigger", trigger).log("Auto split");
+            return splitSegment(split.segmentId())
+                    .thenApply(__ -> {
+                        scheduleFollowUpEvaluation(config);
+                        return null;
+                    });
+        }
+        if (decision instanceof AutoScaleDecision.Merge merge) {
+            log.info().attr("segmentId1", 
merge.segmentId1()).attr("segmentId2", merge.segmentId2())
+                    .attr("reason", merge.reason()).attr("trigger", 
trigger).log("Auto merge");
+            return mergeSegments(merge.segmentId1(), 
merge.segmentId2()).thenApply(__ -> null);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    /**
+     * After a successful auto split, schedule one follow-up evaluation right 
after the split
+     * cooldown expires. A burst of consumers joining at once needs one split 
per cooldown to
+     * converge (e.g. 1 segment → N); without this it converges one split per 
periodic tick
+     * instead, which is slower whenever the cooldown is shorter than the 
tick. The chain
+     * stops naturally at the first evaluation that decides {@code NoAction}.
+     */
+    private void scheduleFollowUpEvaluation(AutoScaleConfig config) {
+        if (closed || !isLeader()) {
+            return;
+        }
+        long delayMs = config.splitCooldown().toMillis() + 1;
+        scheduler().schedule(() -> runAutoScaleSafely("post-split"),
+                delayMs, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Per-subscription consumer counts for the controller-managed 
(STREAM/CHECKPOINT)
+     * subscriptions. QUEUE subscriptions bypass the controller and have no 
coordinator here,
+     * so they are naturally excluded — exactly the set the consumer-count 
split rule wants.
+     */
+    private CompletableFuture<Map<String, Integer>> collectConsumerCounts() {
+        Map<String, Integer> counts = new LinkedHashMap<>();
+        subscriptions.forEach((name, coordinator) ->
+                counts.put(name, coordinator.getConsumers().size()));
+        return CompletableFuture.completedFuture(counts);
+    }
+
+    /** Read the load record (value + Stat modified time) for every active 
segment. */
+    private CompletableFuture<Map<Long, SegmentLoadSample>> 
collectLoadSamples() {
+        Map<Long, SegmentLoadSample> samples = new ConcurrentHashMap<>();
+        List<CompletableFuture<?>> futures = new ArrayList<>();
+        for (Long segmentId : currentLayout.getActiveSegments().keySet()) {
+            futures.add(resources.getSegmentLoadAsync(topicName, segmentId)
+                    .thenAccept(opt -> opt.ifPresent(result -> 
samples.put(segmentId,
+                            new SegmentLoadSample(result.getValue(),
+                                    
result.getStat().getModificationTimestamp())))));
+        }
+        return 
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new))
+                .thenApply(__ -> samples);
+    }
+
+    private ServiceConfiguration brokerConfig() {
+        return brokerService.getPulsar().getConfig();
+    }
+
+    /**
+     * Run one auto split/merge evaluation synchronously-awaitable, for tests. 
Production code
+     * triggers evaluation via the periodic tick and consumer-change events.
+     */
+    @VisibleForTesting
+    CompletableFuture<Void> evaluateAutoScaleForTest() {
+        return evaluateAndAct("test");
+    }
+
     /**
      * Load persisted subscriptions and consumer registrations from the 
metadata store and
      * install them into per-subscription {@link SubscriptionCoordinator} 
instances. Called
@@ -432,6 +662,9 @@ public class ScalableTopicController {
           .thenCompose(__ -> 
resources.getScalableTopicMetadataAsync(topicName, true))
           .thenCompose(optMd -> {
               currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow());
+              // Start the auto-split cooldown only now that the split 
actually happened
+              // (covers manual and auto splits; a failed attempt doesn't burn 
the cooldown).
+              lastSplitAtMs = nowMs;
 
               // Step 5: Notify subscriptions of layout change (triggers 
consumer reassignment)
               return notifySubscriptions(currentLayout);
@@ -478,6 +711,9 @@ public class ScalableTopicController {
           .thenCompose(__ -> 
resources.getScalableTopicMetadataAsync(topicName, true))
           .thenCompose(optMd -> {
               currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow());
+              // Start the auto-merge cooldown only now that the merge 
actually happened
+              // (covers manual and auto merges; a failed attempt doesn't burn 
the cooldown).
+              lastMergeAtMs = nowMs;
               return notifySubscriptions(currentLayout);
           }).thenApply(__ -> currentLayout);
     }
@@ -516,6 +752,9 @@ public class ScalableTopicController {
         }
         return coordinator.registerConsumer(consumerName, consumerId, cnx)
                 .thenApply(assignments -> {
+                    // A new consumer may now outnumber the segments — 
evaluate the
+                    // consumer-count split rule promptly rather than waiting 
for the tick.
+                    onConsumerCountChanged();
                     // Look up by name since the key may have been an existing 
session
                     return assignments.entrySet().stream()
                             .filter(e -> 
consumerName.equals(e.getKey().getConsumerName()))
@@ -961,7 +1200,11 @@ public class ScalableTopicController {
           })
           .thenCompose(__ -> {
               CompletableFuture<?>[] deletes = drained.stream()
-                      .map(this::deleteSegmentBackingTopic)
+                      .map(s -> deleteSegmentBackingTopic(s)
+                              // The segment is gone from the layout — drop 
its load record
+                              // too, or the .../segments/{id}/load entry 
leaks forever.
+                              .thenCompose(___ ->
+                                      
resources.deleteSegmentLoadAsync(topicName, s.segmentId())))
                       .toArray(CompletableFuture[]::new);
               return CompletableFuture.allOf(deletes);
           })
@@ -1094,6 +1337,7 @@ public class ScalableTopicController {
     public CompletableFuture<Void> close() {
         closed = true;
         cancelGcTask();
+        cancelAutoScaleTask();
         // Stop each coordinator's drain poller before clearing — otherwise 
the scheduler
         // task keeps running after the controller goes away.
         subscriptions.values().forEach(SubscriptionCoordinator::close);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
index aed7dd3f8e1..a3f1f0c5ef1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
@@ -18,11 +18,15 @@
  */
 package org.apache.pulsar.broker.service.scalable;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 import lombok.Getter;
 import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
@@ -112,6 +116,43 @@ public class SegmentLayout {
                 .collect(Collectors.toList());
     }
 
+    /**
+     * Number of merge operations in a segment's ancestry, including the 
segment itself.
+     *
+     * <p>A merge is the only operation that produces a segment with more than 
one parent
+     * (a split produces children with exactly one parent), so the merge depth 
is the count
+     * of segments in this segment's ancestor chain — itself included — that 
have
+     * {@code parentIds.size() >= 2}.
+     *
+     * <p>Used by auto split/merge (PIP-483) to cap split↔merge churn: a pair 
is only
+     * merge-eligible while neither side's merge depth has reached the 
configured maximum,
+     * which bounds the merge depth of the resulting child.
+     *
+     * @param segmentId the segment to measure
+     * @return the number of merges in this segment's lineage (0 for a 
never-merged segment)
+     */
+    public int mergeDepth(long segmentId) {
+        int depth = 0;
+        Deque<Long> toVisit = new ArrayDeque<>();
+        Set<Long> visited = new HashSet<>();
+        toVisit.add(segmentId);
+        while (!toVisit.isEmpty()) {
+            long id = toVisit.poll();
+            if (!visited.add(id)) {
+                continue;
+            }
+            SegmentInfo segment = allSegments.get(id);
+            if (segment == null) {
+                continue;
+            }
+            if (segment.parentIds().size() >= 2) {
+                depth++;
+            }
+            toVisit.addAll(segment.parentIds());
+        }
+        return depth;
+    }
+
     /**
      * Produce a new layout by splitting a segment at its midpoint.
      *
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporter.java
new file mode 100644
index 00000000000..56001951c4a
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporter.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.DoubleSupplier;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.SegmentLoadStats;
+
+/**
+ * Writes per-segment {@link SegmentLoadStats} to the metadata store on behalf 
of the broker
+ * that owns a segment's {@code segment://} topic (PIP-483).
+ *
+ * <p>To keep metadata write volume bounded, a sample is persisted only when 
it differs
+ * materially from the last value this broker wrote for that segment — i.e. 
some rate moved
+ * by more than the configured relative threshold (default 25%), or crossed 
to/from zero. A
+ * steady-state segment therefore writes once and then stays quiet; the 
controller's
+ * windowing relies on the stored record's {@code Stat} modification time 
staying put while
+ * load is unchanged.
+ *
+ * <p><b>Known blind spot</b> — the materiality band is anchored at the last 
<em>written</em>
+ * value, not at the policy thresholds. A true rate that settles inside the 
band never
+ * produces a new record, so a segment can sustain up to {@code threshold} 
beyond a split or
+ * merge threshold indefinitely without the controller seeing it. This is 
path-dependent
+ * (whether a given load triggers depends on what was last written) but 
one-directional: it
+ * can only delay an action, never cause a spurious one, and its magnitude is 
bounded by the
+ * configured threshold. Accepted as the cost of bounded write volume; 
operators wanting
+ * tighter tracking lower {@code scalableTopicLoadReportRateChangeThreshold}.
+ *
+ * <p>This class owns only the materiality decision and the last-written 
cache. Sampling the
+ * live {@code TopicStats} and scheduling the periodic sweep are wired in by 
the broker.
+ */
+public class SegmentLoadReporter {
+
+    private final ScalableTopicResources resources;
+    /** Re-read on every sample so the broker config knob is honored 
dynamically. */
+    private final DoubleSupplier rateChangeThreshold;
+
+    /** Last value written per load-record path, so we can skip immaterial 
updates. */
+    private final ConcurrentHashMap<String, SegmentLoadStats> lastWritten = 
new ConcurrentHashMap<>();
+
+    public SegmentLoadReporter(ScalableTopicResources resources, 
DoubleSupplier rateChangeThreshold) {
+        this.resources = resources;
+        this.rateChangeThreshold = rateChangeThreshold;
+    }
+
+    public SegmentLoadReporter(ScalableTopicResources resources, double 
rateChangeThreshold) {
+        this(resources, () -> rateChangeThreshold);
+    }
+
+    /**
+     * Report a segment's current load, writing to the store only if it 
changed materially
+     * since the last write (or has never been written).
+     *
+     * <p>On a local cache miss (broker restart, or segment ownership just 
moved here) the
+     * baseline is seeded from the record already in the store, and the 
materiality gate is
+     * applied against that. Without this, the first sample after every 
ownership move would
+     * write unconditionally and reset the record's modification time — which 
the controller
+     * uses as "cold since" for the merge window — starving merges under 
frequent rebalancing.
+     *
+     * @return a future completing with {@code true} if a write happened, 
{@code false} if the
+     *         sample was immaterial and skipped
+     */
+    public CompletableFuture<Boolean> reportIfChanged(TopicName topic, long 
segmentId,
+                                                      SegmentLoadStats 
current) {
+        String path = resources.segmentLoadPath(topic, segmentId);
+        double threshold = rateChangeThreshold.getAsDouble();
+        SegmentLoadStats last = lastWritten.get(path);
+        if (last == null) {
+            return resources.getSegmentLoadAsync(topic, 
segmentId).thenCompose(stored -> {
+                stored.ifPresent(result -> lastWritten.putIfAbsent(path, 
result.getValue()));
+                SegmentLoadStats baseline = lastWritten.get(path);
+                if (baseline != null && !isMaterialChange(baseline, current, 
threshold)) {
+                    return CompletableFuture.completedFuture(false);
+                }
+                return write(topic, segmentId, path, current);
+            });
+        }
+        if (!isMaterialChange(last, current, threshold)) {
+            return CompletableFuture.completedFuture(false);
+        }
+        return write(topic, segmentId, path, current);
+    }
+
+    private CompletableFuture<Boolean> write(TopicName topic, long segmentId, 
String path,
+                                             SegmentLoadStats current) {
+        return resources.reportSegmentLoadAsync(topic, segmentId, current)
+                .thenApply(__ -> {
+                    lastWritten.put(path, current);
+                    return true;
+                });
+    }
+
+    /**
+     * Drop the cached last-written value for a segment — call when this 
broker stops owning
+     * the segment topic (unload, seal, or delete) so the cache doesn't grow 
unboundedly with
+     * segment churn. A later re-acquire re-seeds the baseline from the stored 
record.
+     */
+    public void forget(TopicName topic, long segmentId) {
+        lastWritten.remove(resources.segmentLoadPath(topic, segmentId));
+    }
+
+    /**
+     * True if any of the four rates changed by more than {@code threshold} 
(relative), or
+     * crossed to/from zero.
+     */
+    static boolean isMaterialChange(SegmentLoadStats last, SegmentLoadStats 
current,
+                                    double threshold) {
+        return changed(last.msgRateIn(), current.msgRateIn(), threshold)
+                || changed(last.bytesRateIn(), current.bytesRateIn(), 
threshold)
+                || changed(last.msgRateOut(), current.msgRateOut(), threshold)
+                || changed(last.bytesRateOut(), current.bytesRateOut(), 
threshold);
+    }
+
+    private static boolean changed(double last, double current, double 
threshold) {
+        if (last == 0.0) {
+            // Any move off zero (a segment going from idle to active) is 
always material;
+            // staying at zero is not.
+            return current != 0.0;
+        }
+        return Math.abs(current - last) / last > threshold;
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadSample.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadSample.java
new file mode 100644
index 00000000000..4fb21d0c954
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadSample.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import org.apache.pulsar.common.scalable.SegmentLoadStats;
+
+/**
+ * A segment's load record as the controller sees it: the persisted {@link 
SegmentLoadStats}
+ * plus the metadata store's last-modified timestamp for the record (PIP-483).
+ *
+ * <p>This is an in-memory evaluator input, never persisted — the timestamp 
comes from the
+ * metadata {@code Stat}, not from the stored value. {@code modifiedAtMs} is 
what the merge
+ * pass uses to require a segment has stayed cold for at least {@code 
mergeWindow}.
+ *
+ * @param stats        the persisted rates
+ * @param modifiedAtMs metadata-store last-modified time of the load record, 
in epoch millis
+ */
+public record SegmentLoadSample(SegmentLoadStats stats, long modifiedAtMs) {
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java
new file mode 100644
index 00000000000..acc104c2773
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.testng.annotations.Test;
+
+public class AutoScaleConfigTest {
+
+    @Test
+    public void testDefaultsMatchBrokerConfig() {
+        AutoScaleConfig c = AutoScaleConfig.fromBrokerConfig(new 
ServiceConfiguration());
+        assertTrue(c.enabled());
+        assertEquals(c.maxSegments(), 64);
+        assertEquals(c.minSegments(), 1);
+        assertEquals(c.maxDagDepth(), 10);
+        assertEquals(c.splitCooldown(), Duration.ofSeconds(60));
+        assertEquals(c.mergeCooldown(), Duration.ofMinutes(5));
+        assertEquals(c.mergeWindow(), Duration.ofMinutes(5));
+        assertEquals(c.splitMsgRateIn(), 10_000.0);
+        assertEquals(c.splitBytesRateIn(), 50_000_000.0);
+        assertEquals(c.splitMsgRateOut(), 50_000.0);
+        assertEquals(c.splitBytesRateOut(), 250_000_000.0);
+        assertEquals(c.mergeMsgRateIn(), 1_000.0);
+        assertEquals(c.mergeBytesRateIn(), 5_000_000.0);
+        assertEquals(c.mergeMsgRateOut(), 5_000.0);
+        assertEquals(c.mergeBytesRateOut(), 25_000_000.0);
+
+        // Split thresholds must sit strictly above the corresponding merge 
thresholds
+        // (the hysteresis dead-band).
+        assertTrue(c.splitMsgRateIn() > c.mergeMsgRateIn());
+        assertTrue(c.splitBytesRateIn() > c.mergeBytesRateIn());
+        assertTrue(c.splitMsgRateOut() > c.mergeMsgRateOut());
+        assertTrue(c.splitBytesRateOut() > c.mergeBytesRateOut());
+    }
+
+    @Test
+    public void testValidationRejectsBadConfig() {
+        // Zero split threshold: the evaluator scores rate/threshold, so 0 
would yield
+        // Infinity (or NaN for a zero rate) — must be rejected at resolution 
time.
+        ServiceConfiguration zeroSplit = new ServiceConfiguration();
+        zeroSplit.setScalableTopicSplitMsgRateInThreshold(0);
+        assertThrows(IllegalArgumentException.class,
+                () -> AutoScaleConfig.fromBrokerConfig(zeroSplit));
+
+        // Hysteresis inversion: merge threshold at/above the split threshold.
+        ServiceConfiguration inverted = new ServiceConfiguration();
+        inverted.setScalableTopicMergeMsgRateInThreshold(
+                inverted.getScalableTopicSplitMsgRateInThreshold());
+        assertThrows(IllegalArgumentException.class,
+                () -> AutoScaleConfig.fromBrokerConfig(inverted));
+
+        // min/max segment inversion.
+        ServiceConfiguration minOverMax = new ServiceConfiguration();
+        minOverMax.setScalableTopicMinSegments(10);
+        minOverMax.setScalableTopicMaxSegments(5);
+        assertThrows(IllegalArgumentException.class,
+                () -> AutoScaleConfig.fromBrokerConfig(minOverMax));
+
+        // Negative cooldown.
+        ServiceConfiguration negativeCooldown = new ServiceConfiguration();
+        negativeCooldown.setScalableTopicSplitCooldownSeconds(-1);
+        assertThrows(IllegalArgumentException.class,
+                () -> AutoScaleConfig.fromBrokerConfig(negativeCooldown));
+    }
+
+    @Test
+    public void testZeroMergeThresholdsAllowedAsMergeDisable() {
+        // Merge thresholds of 0 are a legitimate "never merge" setting: no 
rate is ever
+        // strictly below 0, so segments never qualify as cold. Must validate 
cleanly.
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setScalableTopicMergeMsgRateInThreshold(0);
+        conf.setScalableTopicMergeBytesRateInThreshold(0L);
+        conf.setScalableTopicMergeMsgRateOutThreshold(0);
+        conf.setScalableTopicMergeBytesRateOutThreshold(0L);
+        AutoScaleConfig c = AutoScaleConfig.fromBrokerConfig(conf);
+        assertEquals(c.mergeMsgRateIn(), 0.0);
+    }
+
+    @Test
+    public void testOverriddenBrokerConfigPropagates() {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setScalableTopicAutoScaleEnabled(false);
+        conf.setScalableTopicMaxSegments(8);
+        conf.setScalableTopicMinSegments(2);
+        conf.setScalableTopicMaxDagDepth(3);
+        conf.setScalableTopicSplitCooldownSeconds(30);
+        conf.setScalableTopicMergeCooldownSeconds(120);
+        conf.setScalableTopicMergeWindowSeconds(90);
+        conf.setScalableTopicSplitMsgRateInThreshold(1234);
+        conf.setScalableTopicMergeBytesRateOutThreshold(99L);
+
+        AutoScaleConfig c = AutoScaleConfig.fromBrokerConfig(conf);
+        assertFalse(c.enabled());
+        assertEquals(c.maxSegments(), 8);
+        assertEquals(c.minSegments(), 2);
+        assertEquals(c.maxDagDepth(), 3);
+        assertEquals(c.splitCooldown(), Duration.ofSeconds(30));
+        assertEquals(c.mergeCooldown(), Duration.ofSeconds(120));
+        assertEquals(c.mergeWindow(), Duration.ofSeconds(90));
+        assertEquals(c.splitMsgRateIn(), 1234.0);
+        assertEquals(c.mergeBytesRateOut(), 99.0);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluatorTest.java
new file mode 100644
index 00000000000..b85845de647
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluatorTest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.common.scalable.SegmentLoadStats;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for the pure {@link AutoScalePolicyEvaluator} decision function 
(PIP-483).
+ * All inputs are constructed in-memory; there is no broker or metadata store.
+ */
+public class AutoScalePolicyEvaluatorTest {
+
+    private static final long NOW = 1_700_000_000_000L;
+    private static final long NO_PRIOR = Long.MIN_VALUE;
+
+    // Split thresholds well above merge thresholds — the dead-band is the 
hysteresis.
+    private static final double SPLIT_MSG_IN = 10_000;
+    private static final double SPLIT_BYTES_IN = 50_000_000;
+    private static final double SPLIT_MSG_OUT = 50_000;
+    private static final double SPLIT_BYTES_OUT = 250_000_000;
+    private static final double MERGE_MSG_IN = 1_000;
+    private static final double MERGE_BYTES_IN = 5_000_000;
+    private static final double MERGE_MSG_OUT = 5_000;
+    private static final double MERGE_BYTES_OUT = 25_000_000;
+
+    private static AutoScaleConfig.AutoScaleConfigBuilder baseConfig() {
+        return AutoScaleConfig.builder()
+                .enabled(true)
+                .maxSegments(64)
+                .minSegments(1)
+                .maxDagDepth(10)
+                .splitCooldown(Duration.ofMinutes(1))
+                .mergeCooldown(Duration.ofMinutes(5))
+                .mergeWindow(Duration.ofMinutes(5))
+                .splitMsgRateIn(SPLIT_MSG_IN)
+                .splitBytesRateIn(SPLIT_BYTES_IN)
+                .splitMsgRateOut(SPLIT_MSG_OUT)
+                .splitBytesRateOut(SPLIT_BYTES_OUT)
+                .mergeMsgRateIn(MERGE_MSG_IN)
+                .mergeBytesRateIn(MERGE_BYTES_IN)
+                .mergeMsgRateOut(MERGE_MSG_OUT)
+                .mergeBytesRateOut(MERGE_BYTES_OUT);
+    }
+
+    private static SegmentLayout initialLayout(int segments) {
+        return SegmentLayout.fromMetadata(
+                ScalableTopicController.createInitialMetadata(segments, 
Map.of()));
+    }
+
+    /** A load sample with the given rates, last modified {@code ageMs} ago. */
+    private static SegmentLoadSample sample(double msgIn, double bytesIn, 
double msgOut,
+                                            double bytesOut, long ageMs) {
+        return new SegmentLoadSample(
+                new SegmentLoadStats(msgIn, bytesIn, msgOut, bytesOut), NOW - 
ageMs);
+    }
+
+    private static SegmentLoadSample cold(long ageMs) {
+        return sample(0, 0, 0, 0, ageMs);
+    }
+
+    private static long old() {
+        return Duration.ofMinutes(10).toMillis();
+    }
+
+    private static AutoScaleDecision decide(SegmentLayout layout,
+                                            Map<Long, SegmentLoadSample> load,
+                                            Map<String, Integer> consumers,
+                                            AutoScaleConfig config) {
+        return AutoScalePolicyEvaluator.decide(layout, load, consumers, 
config, NOW,
+                NO_PRIOR, NO_PRIOR);
+    }
+
+    // --- enable switch ---
+
+    @Test
+    public void testDisabledReturnsNoAction() {
+        SegmentLayout layout = initialLayout(1);
+        Map<Long, SegmentLoadSample> load = Map.of(0L, sample(1_000_000, 0, 0, 
0, old()));
+        AutoScaleDecision d = decide(layout, load, Map.of("s", 100),
+                baseConfig().enabled(false).build());
+        assertTrue(d instanceof AutoScaleDecision.NoAction);
+    }
+
+    // --- consumer-driven split ---
+
+    @Test
+    public void testConsumerDrivenSplitTargetsBusiestSegment() {
+        SegmentLayout layout = initialLayout(2);
+        Map<Long, SegmentLoadSample> load = Map.of(
+                0L, sample(100, 0, 0, 0, old()),
+                1L, sample(200, 0, 0, 0, old()));
+        // One subscription with 3 consumers but only 2 segments → need a 3rd 
segment.
+        AutoScaleDecision d = decide(layout, load, Map.of("sub", 3), 
baseConfig().build());
+        assertTrue(d instanceof AutoScaleDecision.Split, d.toString());
+        AutoScaleDecision.Split s = (AutoScaleDecision.Split) d;
+        assertEquals(s.segmentId(), 1L, "should split the busiest segment by 
msgRateIn");
+        assertEquals(s.reason(), "consumer-count");
+    }
+
+    @Test
+    public void testConsumerCountUsesPerSubscriptionMaxNotSum() {
+        SegmentLayout layout = initialLayout(2);
+        // Fresh samples (age 0) so the merge pass can't fire — isolates the 
consumer check.
+        Map<Long, SegmentLoadSample> load = Map.of(0L, cold(0), 1L, cold(0));
+        // Two subscriptions, 2 consumers each. Per-subscription max is 2, not 
4 → 2 == 2
+        // segments, no scale-up needed.
+        AutoScaleDecision d = decide(layout, load, Map.of("a", 2, "b", 2), 
baseConfig().build());
+        assertTrue(d instanceof AutoScaleDecision.NoAction,
+                "per-subscription max (2) does not exceed 2 segments");
+    }
+
+    @Test
+    public void testConsumerDrivenSplitRespectsMaxSegments() {
+        SegmentLayout layout = initialLayout(2);
+        // Fresh samples so the merge pass can't fire — isolates the split 
suppression.
+        Map<Long, SegmentLoadSample> load = Map.of(0L, cold(0), 1L, cold(0));
+        AutoScaleDecision d = decide(layout, load, Map.of("sub", 5),
+                baseConfig().maxSegments(2).build());
+        assertTrue(d instanceof AutoScaleDecision.NoAction, "at maxSegments, 
no split");
+    }
+
+    @Test
+    public void testConsumerDrivenSplitRespectsSplitCooldown() {
+        SegmentLayout layout = initialLayout(2);
+        Map<Long, SegmentLoadSample> load = Map.of(0L, cold(0), 1L, cold(0));
+        long recentSplit = NOW - Duration.ofSeconds(30).toMillis(); // < 1m 
cooldown
+        AutoScaleDecision d = AutoScalePolicyEvaluator.decide(layout, load, 
Map.of("sub", 3),
+                baseConfig().build(), NOW, recentSplit, NO_PRIOR);
+        assertTrue(d instanceof AutoScaleDecision.NoAction, "within split 
cooldown, no split");
+    }
+
+    // --- load-driven split ---
+
+    @Test
+    public void testLoadDrivenSplitOnMsgRateIn() {
+        SegmentLayout layout = initialLayout(1);
+        Map<Long, SegmentLoadSample> load = Map.of(0L, sample(SPLIT_MSG_IN + 
1, 0, 0, 0, old()));
+        AutoScaleDecision d = decide(layout, load, Map.of(), 
baseConfig().build());
+        assertTrue(d instanceof AutoScaleDecision.Split, d.toString());
+        AutoScaleDecision.Split s = (AutoScaleDecision.Split) d;
+        assertEquals(s.segmentId(), 0L);
+        assertEquals(s.reason(), "msgRateIn");
+    }
+
+    @Test
+    public void testLoadDrivenSplitOnBytesRateOut() {
+        SegmentLayout layout = initialLayout(1);
+        Map<Long, SegmentLoadSample> load =
+                Map.of(0L, sample(0, 0, 0, SPLIT_BYTES_OUT + 1, old()));
+        AutoScaleDecision d = decide(layout, load, Map.of(), 
baseConfig().build());
+        assertTrue(d instanceof AutoScaleDecision.Split, d.toString());
+        assertEquals(((AutoScaleDecision.Split) d).reason(), "bytesRateOut");
+    }
+
+    @Test
+    public void testLoadDrivenSplitPicksMostOverloaded() {
+        SegmentLayout layout = initialLayout(2);
+        // seg0 at 1.1x, seg1 at 1.5x of the msgRateIn split threshold.
+        Map<Long, SegmentLoadSample> load = Map.of(
+                0L, sample(SPLIT_MSG_IN * 1.1, 0, 0, 0, old()),
+                1L, sample(SPLIT_MSG_IN * 1.5, 0, 0, 0, old()));
+        AutoScaleDecision d = decide(layout, load, Map.of(), 
baseConfig().build());
+        assertTrue(d instanceof AutoScaleDecision.Split, d.toString());
+        assertEquals(((AutoScaleDecision.Split) d).segmentId(), 1L,
+                "should split the more overloaded segment");
+    }
+
+    @Test
+    public void testNoSplitWhenAllUnderThreshold() {
+        SegmentLayout layout = initialLayout(1);
+        // Just below every split threshold, freshly updated → not 
merge-eligible either.
+        Map<Long, SegmentLoadSample> load = Map.of(0L,
+                sample(SPLIT_MSG_IN - 1, SPLIT_BYTES_IN - 1, SPLIT_MSG_OUT - 1,
+                        SPLIT_BYTES_OUT - 1, 0));
+        AutoScaleDecision d = decide(layout, load, Map.of(), 
baseConfig().build());
+        assertTrue(d instanceof AutoScaleDecision.NoAction);
+    }
+
+    // --- merge ---
+
+    @Test
+    public void testMergeColdAdjacentPair() {
+        SegmentLayout layout = initialLayout(2);
+        Map<Long, SegmentLoadSample> load = Map.of(0L, cold(old()), 1L, 
cold(old()));
+        AutoScaleDecision d = decide(layout, load, Map.of(), 
baseConfig().build());
+        assertTrue(d instanceof AutoScaleDecision.Merge, d.toString());
+        AutoScaleDecision.Merge m = (AutoScaleDecision.Merge) d;
+        assertTrue((m.segmentId1() == 0L && m.segmentId2() == 1L)
+                || (m.segmentId1() == 1L && m.segmentId2() == 0L));
+    }
+
+    @Test
+    public void testMergeRespectsMinSegments() {
+        SegmentLayout layout = initialLayout(2);
+        Map<Long, SegmentLoadSample> load = Map.of(0L, cold(old()), 1L, 
cold(old()));
+        AutoScaleDecision d = decide(layout, load, Map.of(), 
baseConfig().minSegments(2).build());
+        assertTrue(d instanceof AutoScaleDecision.NoAction, "at minSegments, 
no merge");
+    }
+
+    @Test
+    public void testMergeRespectsMergeCooldown() {
+        SegmentLayout layout = initialLayout(2);
+        Map<Long, SegmentLoadSample> load = Map.of(0L, cold(old()), 1L, 
cold(old()));
+        long recentMerge = NOW - Duration.ofMinutes(1).toMillis(); // < 5m 
cooldown
+        AutoScaleDecision d = AutoScalePolicyEvaluator.decide(layout, load, 
Map.of(),
+                baseConfig().build(), NOW, NO_PRIOR, recentMerge);
+        assertTrue(d instanceof AutoScaleDecision.NoAction, "within merge 
cooldown, no merge");
+    }
+
+    @Test
+    public void testMergeRequiresColdForFullWindow() {
+        SegmentLayout layout = initialLayout(2);
+        // Cold values, but only updated 1 minute ago — window is 5 minutes.
+        long recent = Duration.ofMinutes(1).toMillis();
+        Map<Long, SegmentLoadSample> load = Map.of(0L, cold(recent), 1L, 
cold(recent));
+        AutoScaleDecision d = decide(layout, load, Map.of(), 
baseConfig().build());
+        assertTrue(d instanceof AutoScaleDecision.NoAction,
+                "segment must stay cold for the full mergeWindow");
+    }
+
+    @Test
+    public void testMergeRequiresLoadRecordPresent() {
+        SegmentLayout layout = initialLayout(2);
+        // No load records at all → no evidence of durable coldness → never 
merge.
+        AutoScaleDecision d = decide(layout, new HashMap<>(), Map.of(), 
baseConfig().build());
+        assertTrue(d instanceof AutoScaleDecision.NoAction);
+    }
+
+    @Test
+    public void testHysteresisDeadBandNoSplitNoMerge() {
+        SegmentLayout layout = initialLayout(2);
+        // msgRateIn sits between the merge threshold and the split threshold 
for seg0:
+        // not hot enough to split, not cold enough to merge.
+        Map<Long, SegmentLoadSample> load = Map.of(
+                0L, sample(MERGE_MSG_IN + 1, 0, 0, 0, old()),
+                1L, cold(old()));
+        AutoScaleDecision d = decide(layout, load, Map.of(), 
baseConfig().build());
+        assertTrue(d instanceof AutoScaleDecision.NoAction,
+                "in the dead-band, neither split nor merge");
+    }
+
+    @Test
+    public void testSplitTakesPriorityOverMerge() {
+        SegmentLayout layout = initialLayout(2);
+        // seg0 hot (would split), seg0+seg1 also a cold-ish adjacent pair — 
split wins.
+        Map<Long, SegmentLoadSample> load = Map.of(
+                0L, sample(SPLIT_MSG_IN + 1, 0, 0, 0, old()),
+                1L, cold(old()));
+        AutoScaleDecision d = decide(layout, load, Map.of(), 
baseConfig().build());
+        assertTrue(d instanceof AutoScaleDecision.Split, "split has priority");
+    }
+
+    @Test
+    public void testMergeRespectsMaxDagDepth() {
+        // Build a layout whose two active segments each already have merge 
depth 1:
+        // split(0) → {1,2}; merge(1,2) → {3}; split(3) → {4,5}. Segments 4 
and 5 each have
+        // one merge (node 3) in their ancestry.
+        SegmentLayout layout = initialLayout(1)
+                .splitSegment(0, NOW)
+                .mergeSegments(1, 2, NOW)
+                .splitSegment(3, NOW);
+        List<Long> active = 
layout.getActiveSegments().keySet().stream().sorted().toList();
+        assertEquals(active, List.of(4L, 5L));
+        assertEquals(layout.mergeDepth(4L), 1);
+        assertEquals(layout.mergeDepth(5L), 1);
+
+        Map<Long, SegmentLoadSample> load = Map.of(4L, cold(old()), 5L, 
cold(old()));
+
+        // maxDagDepth=1: both at the cap → no merge.
+        AutoScaleDecision blocked = decide(layout, load, Map.of(),
+                baseConfig().maxDagDepth(1).build());
+        assertTrue(blocked instanceof AutoScaleDecision.NoAction, "merge 
blocked at maxDagDepth");
+
+        // maxDagDepth=2: under the cap → merge allowed.
+        AutoScaleDecision allowed = decide(layout, load, Map.of(),
+                baseConfig().maxDagDepth(2).build());
+        assertTrue(allowed instanceof AutoScaleDecision.Merge, "merge allowed 
below maxDagDepth");
+    }
+
+    @Test
+    public void testMergePicksColdestAdjacentPair() {
+        SegmentLayout layout = initialLayout(4); // ranges tile [0,MAX] in 4 
adjacent quarters
+        // seg0+seg1 combined hotter than seg2+seg3; all below merge 
thresholds and old.
+        Map<Long, SegmentLoadSample> load = Map.of(
+                0L, sample(900, 0, 0, 0, old()),
+                1L, sample(900, 0, 0, 0, old()),
+                2L, sample(10, 0, 0, 0, old()),
+                3L, sample(10, 0, 0, 0, old()));
+        AutoScaleDecision d = decide(layout, load, Map.of(), 
baseConfig().build());
+        assertTrue(d instanceof AutoScaleDecision.Merge, d.toString());
+        AutoScaleDecision.Merge m = (AutoScaleDecision.Merge) d;
+        // The coldest adjacent pair is {2,3}.
+        assertTrue((m.segmentId1() == 2L && m.segmentId2() == 3L)
+                || (m.segmentId1() == 3L && m.segmentId2() == 2L),
+                "should pick the coldest adjacent pair {2,3}, got " + m);
+    }
+
+    @Test
+    public void testMergedPairIsAlwaysAdjacent() {
+        SegmentLayout layout = initialLayout(4);
+        Map<Long, SegmentLoadSample> load = Map.of(
+                0L, cold(old()), 1L, cold(old()), 2L, cold(old()), 3L, 
cold(old()));
+        AutoScaleDecision d = decide(layout, load, Map.of(), 
baseConfig().build());
+        assertTrue(d instanceof AutoScaleDecision.Merge, d.toString());
+        AutoScaleDecision.Merge m = (AutoScaleDecision.Merge) d;
+        SegmentInfo a = layout.getAllSegments().get(m.segmentId1());
+        SegmentInfo b = layout.getAllSegments().get(m.segmentId2());
+        assertTrue(a.hashRange().isAdjacentTo(b.hashRange()),
+                "merged pair must be hash-range adjacent");
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java
new file mode 100644
index 00000000000..74ad5e8a5a3
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.TransportCnx;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.ScalableTopics;
+import org.apache.pulsar.client.admin.Topics;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.SegmentLoadStats;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.coordination.CoordinationService;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
+import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Integration tests for the controller's auto split/merge wiring (PIP-483): 
the periodic /
+ * event-driven evaluation reads load records + consumer counts, runs the 
evaluator, and
+ * dispatches to the real splitSegment / mergeSegments paths (against an 
in-memory metadata
+ * store and a mocked cross-broker admin client). The decision logic itself is 
unit-tested in
+ * {@link AutoScalePolicyEvaluatorTest}; here we verify the plumbing actually 
fires.
+ */
+public class ScalableTopicControllerAutoScaleTest {
+
+    private static final String BROKER_ID = "broker-test";
+
+    private MetadataStoreExtended store;
+    private CoordinationService coordinationService;
+    private ScalableTopicResources resources;
+    private ScheduledExecutorService scheduler;
+    private BrokerService brokerService;
+    private PulsarService pulsar;
+    private ServiceConfiguration config;
+    private ScalableTopics scalableTopics;
+    private ScalableTopicController controller;
+    private TopicName topicName;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        store = new LocalMemoryMetadataStore("memory:local", 
MetadataStoreConfig.builder().build());
+        coordinationService = new CoordinationServiceImpl(store);
+        resources = new ScalableTopicResources(store, 30);
+        scheduler = Executors.newSingleThreadScheduledExecutor();
+        topicName = TopicName.get("topic://tenant/ns/my-topic");
+
+        // Auto-scale tuned for deterministic single-shot evaluation: no 
cooldowns/windows so a
+        // single evaluateAutoScaleForTest() call acts immediately, low-ish 
thresholds.
+        config = new ServiceConfiguration();
+        config.setScalableTopicAutoScaleEnabled(true);
+        config.setScalableTopicMaxSegments(64);
+        config.setScalableTopicMinSegments(1);
+        config.setScalableTopicSplitCooldownSeconds(0);
+        config.setScalableTopicMergeCooldownSeconds(0);
+        config.setScalableTopicMergeWindowSeconds(0);
+        config.setScalableTopicSplitMsgRateInThreshold(10_000);
+
+        brokerService = mock(BrokerService.class);
+        pulsar = mock(PulsarService.class);
+        PulsarAdmin admin = mock(PulsarAdmin.class);
+        Topics topics = mock(Topics.class);
+        scalableTopics = mock(ScalableTopics.class);
+
+        when(brokerService.getPulsar()).thenReturn(pulsar);
+        when(brokerService.getTopicIfExists(anyString()))
+                
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+        when(pulsar.getBrokerId()).thenReturn(BROKER_ID);
+        when(pulsar.getExecutor()).thenReturn(scheduler);
+        when(pulsar.getConfig()).thenReturn(config);
+        when(pulsar.getAdminClient()).thenReturn(admin);
+        when(admin.topics()).thenReturn(topics);
+        when(admin.scalableTopics()).thenReturn(scalableTopics);
+        when(scalableTopics.createSegmentAsync(anyString(), any()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(scalableTopics.terminateSegmentAsync(anyString()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        if (controller != null) {
+            controller.close().join();
+        }
+        if (coordinationService != null) {
+            coordinationService.close();
+        }
+        if (store != null) {
+            store.close();
+        }
+        if (scheduler != null) {
+            scheduler.shutdownNow();
+        }
+    }
+
+    private void startController(int initialSegments) throws Exception {
+        resources.createScalableTopicAsync(topicName,
+                ScalableTopicController.createInitialMetadata(initialSegments, 
Map.of())).get();
+        controller = new ScalableTopicController(topicName, resources, 
brokerService,
+                coordinationService);
+        controller.initialize().get();
+    }
+
+    private int activeSegmentCount() throws Exception {
+        return controller.getLayout().get().getActiveSegments().size();
+    }
+
+    @Test
+    public void testLoadDrivenSplit() throws Exception {
+        startController(2);
+        assertEquals(activeSegmentCount(), 2);
+
+        // Segment 0 is hot on msgRateIn → expect a split.
+        resources.reportSegmentLoadAsync(topicName, 0,
+                new SegmentLoadStats(20_000, 0, 0, 0)).get();
+
+        controller.evaluateAutoScaleForTest().get();
+        assertEquals(activeSegmentCount(), 3, "hot segment should have been 
split");
+    }
+
+    @Test
+    public void testNoSplitWhenUnderThreshold() throws Exception {
+        startController(2);
+        resources.reportSegmentLoadAsync(topicName, 0,
+                new SegmentLoadStats(100, 0, 0, 0)).get();
+
+        controller.evaluateAutoScaleForTest().get();
+        assertEquals(activeSegmentCount(), 2, "no segment over threshold → no 
change");
+    }
+
+    @Test
+    public void testDisabledConfigIsNoOp() throws Exception {
+        config.setScalableTopicAutoScaleEnabled(false);
+        startController(2);
+        resources.reportSegmentLoadAsync(topicName, 0,
+                new SegmentLoadStats(1_000_000, 0, 0, 0)).get();
+
+        controller.evaluateAutoScaleForTest().get();
+        assertEquals(activeSegmentCount(), 2, "disabled → no action even when 
hot");
+    }
+
+    @Test
+    public void testColdSegmentsMerge() throws Exception {
+        startController(4);
+        // All segments cold (no load records written → treated as zero; but 
merge requires a
+        // record present, so write explicit cold records). mergeWindow=0 so 
they're eligible.
+        for (long id = 0; id < 4; id++) {
+            resources.reportSegmentLoadAsync(topicName, id, new 
SegmentLoadStats(1, 0, 0, 0)).get();
+        }
+
+        controller.evaluateAutoScaleForTest().get();
+        assertEquals(activeSegmentCount(), 3, "a cold adjacent pair should 
have been merged");
+    }
+
+    @Test
+    public void testConsumerDrivenSplit() throws Exception {
+        startController(1);
+        assertEquals(activeSegmentCount(), 1);
+
+        // Two consumers on one segment → need a second segment. 
registerConsumer fires an
+        // event-driven evaluation (fire-and-forget); await its effect.
+        controller.registerConsumer("sub", "c1", 1L, 
ScalableConsumerType.STREAM,
+                mock(TransportCnx.class)).get();
+        controller.registerConsumer("sub", "c2", 2L, 
ScalableConsumerType.STREAM,
+                mock(TransportCnx.class)).get();
+
+        Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(
+                () -> assertEquals(activeSegmentCount(), 2,
+                        "2 consumers on 1 segment should drive a split"));
+    }
+
+    @Test
+    public void testConsumerBurstConvergesWithoutTicks() throws Exception {
+        // A group of consumers joining in quick succession must converge to 
one segment
+        // each purely from the event-driven evaluations + post-split 
follow-up chain — no
+        // periodic tick and no manual evaluation calls.
+        startController(1);
+        for (int i = 1; i <= 4; i++) {
+            controller.registerConsumer("sub", "c" + i, i, 
ScalableConsumerType.STREAM,
+                    mock(TransportCnx.class)).get();
+        }
+        Awaitility.await().atMost(Duration.ofSeconds(20)).untilAsserted(
+                () -> assertEquals(activeSegmentCount(), 4,
+                        "4 consumers must drive convergence to 4 segments"));
+    }
+
+    @Test
+    public void testSplitCooldownBlocksSecondSplit() throws Exception {
+        config.setScalableTopicSplitCooldownSeconds(3600); // 1h — blocks a 
second split
+        startController(2);
+        resources.reportSegmentLoadAsync(topicName, 0,
+                new SegmentLoadStats(20_000, 0, 0, 0)).get();
+
+        controller.evaluateAutoScaleForTest().get();
+        assertEquals(activeSegmentCount(), 3, "first split happens");
+
+        // Still hot, but within cooldown → no second split.
+        resources.reportSegmentLoadAsync(topicName, 1,
+                new SegmentLoadStats(20_000, 0, 0, 0)).get();
+        controller.evaluateAutoScaleForTest().get();
+        assertEquals(activeSegmentCount(), 3, "second split blocked by 
cooldown");
+    }
+
+    @Test
+    public void testSplitCooldownSurvivesLeaderFailover() throws Exception {
+        config.setScalableTopicSplitCooldownSeconds(3600);
+        startController(2);
+        resources.reportSegmentLoadAsync(topicName, 0,
+                new SegmentLoadStats(20_000, 0, 0, 0)).get();
+        controller.evaluateAutoScaleForTest().get();
+        assertEquals(activeSegmentCount(), 3, "first split happens");
+
+        // Leadership moves: close this controller and elect a fresh one. The 
new leader's
+        // in-memory cooldown clocks must be re-seeded from the layout (the 
children's
+        // createdAtMs records when the last split ran), not reset to "never".
+        controller.close().join();
+        controller = new ScalableTopicController(topicName, resources, 
brokerService,
+                coordinationService);
+        controller.initialize().get();
+
+        resources.reportSegmentLoadAsync(topicName, 1,
+                new SegmentLoadStats(20_000, 0, 0, 0)).get();
+        controller.evaluateAutoScaleForTest().get();
+        assertEquals(activeSegmentCount(), 3,
+                "split cooldown must survive failover via layout-derived 
seeding");
+    }
+
+    @Test
+    public void testFailedSplitDoesNotBurnCooldown() throws Exception {
+        config.setScalableTopicSplitCooldownSeconds(3600);
+        startController(2);
+        resources.reportSegmentLoadAsync(topicName, 0,
+                new SegmentLoadStats(20_000, 0, 0, 0)).get();
+
+        // First attempt fails at the segment-topic-creation step. The 
evaluation future
+        // surfaces the failure (the production tick wrapper logs-and-swallows 
it).
+        when(scalableTopics.createSegmentAsync(anyString(), any()))
+                .thenReturn(CompletableFuture.failedFuture(new 
RuntimeException("injected")));
+        assertThrows(java.util.concurrent.ExecutionException.class,
+                () -> controller.evaluateAutoScaleForTest().get());
+        assertEquals(activeSegmentCount(), 2, "failed split leaves the layout 
unchanged");
+
+        // The failure must not have started the cooldown: once the transient 
error clears,
+        // the next evaluation splits immediately instead of waiting out the 
hour.
+        when(scalableTopics.createSegmentAsync(anyString(), any()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        controller.evaluateAutoScaleForTest().get();
+        assertEquals(activeSegmentCount(), 3, "retry after a failed split is 
not cooldown-blocked");
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
index 4a0cca1eb78..dccd8d0e307 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
@@ -693,6 +693,10 @@ public class ScalableTopicControllerTest {
         controller.splitSegment(0).get();
         assertEquals(controller.sealedSegmentCount(), sealedBefore + 1);
 
+        // Give the doomed segment a load record, as the owning broker's 
reporter would.
+        resources.reportSegmentLoadAsync(topicName, 0,
+                new org.apache.pulsar.common.scalable.SegmentLoadStats(1, 1, 
1, 1)).get();
+
         // Tick at the seal time — retention not yet elapsed; nothing pruned.
         controller.runGcTickAsync().get();
         
assertTrue(controller.getLayout().get().getAllSegments().containsKey(0L),
@@ -705,6 +709,9 @@ public class ScalableTopicControllerTest {
                 "tick past retention must prune the sealed segment");
         // Backing topic delete was issued via the segment-aware admin call.
         verify(scalableTopics).deleteSegmentAsync(anyString(), anyBoolean());
+        // The pruned segment's load record is deleted along with it.
+        assertFalse(resources.getSegmentLoadAsync(topicName, 
0).get().isPresent(),
+                "prune must delete the segment's load record");
     }
 
     /**
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
index 47271c2dc02..8ece3651e5a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
@@ -327,9 +327,20 @@ public class ScalableTopicServiceTest {
         service.createScalableTopic(tn, 2).get();
         assertTrue(resources.scalableTopicExistsAsync(tn).get());
 
+        // Seed child records under the topic — a subscription and a 
per-segment load
+        // record — to verify the delete is recursive and takes everything 
with it.
+        resources.createSubscriptionAsync(tn, "sub-a",
+                
org.apache.pulsar.broker.resources.SubscriptionType.STREAM).get();
+        resources.reportSegmentLoadAsync(tn, 0,
+                new org.apache.pulsar.common.scalable.SegmentLoadStats(1, 1, 
1, 1)).get();
+
         service.deleteScalableTopic(tn).get();
 
         assertFalse(resources.scalableTopicExistsAsync(tn).get());
+        assertFalse(resources.subscriptionExistsAsync(tn, "sub-a").get(),
+                "topic delete must remove subscription records");
+        assertFalse(resources.getSegmentLoadAsync(tn, 0).get().isPresent(),
+                "topic delete must remove segment load records");
         verify(scalableTopicsAdmin, org.mockito.Mockito.atLeast(2))
                 .deleteSegmentAsync(anyString(), anyBoolean());
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
index b936820072f..364c9299a11 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
@@ -291,4 +291,37 @@ public class SegmentLayoutTest {
         SegmentLayout split2 = split1.splitSegment(1, 0L);
         assertEquals(split2.getNextSegmentId(), 6);
     }
+
+    @Test
+    public void testMergeDepthZeroForNeverMergedSegments() {
+        SegmentLayout layout = SegmentLayout.fromMetadata(
+                ScalableTopicController.createInitialMetadata(2, Map.of()));
+        assertEquals(layout.mergeDepth(0), 0);
+        assertEquals(layout.mergeDepth(1), 0);
+
+        // Splits never increase merge depth.
+        SegmentLayout afterSplit = layout.splitSegment(0, 0L);
+        assertEquals(afterSplit.mergeDepth(2), 0);
+        assertEquals(afterSplit.mergeDepth(3), 0);
+    }
+
+    @Test
+    public void testMergeDepthCountsMergesInLineage() {
+        // split(0) → {1,2}; merge(1,2) → {3}; split(3) → {4,5}.
+        SegmentLayout layout = SegmentLayout
+                .fromMetadata(ScalableTopicController.createInitialMetadata(1, 
Map.of()))
+                .splitSegment(0, 0L)
+                .mergeSegments(1, 2, 0L)
+                .splitSegment(3, 0L);
+
+        // The merged node 3 contributes one merge to the ancestry of 4 and 5.
+        assertEquals(layout.mergeDepth(3), 1);
+        assertEquals(layout.mergeDepth(4), 1);
+        assertEquals(layout.mergeDepth(5), 1);
+
+        // A second merge in the lineage bumps it to 2.
+        SegmentLayout twice = layout.mergeSegments(4, 5, 0L);
+        long mergedAgain = twice.getNextSegmentId() - 1;
+        assertEquals(twice.mergeDepth(mergedAgain), 2);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporterTest.java
new file mode 100644
index 00000000000..d35c074105e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporterTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.util.Optional;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.SegmentLoadStats;
+import org.apache.pulsar.metadata.api.CacheGetResult;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class SegmentLoadReporterTest {
+
+    private static final double THRESHOLD = 0.25;
+    private static final TopicName TOPIC = 
TopicName.get("topic://tenant/ns/my-topic");
+
+    private MetadataStoreExtended store;
+    private ScalableTopicResources resources;
+    private SegmentLoadReporter reporter;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        store = new LocalMemoryMetadataStore("memory:local", 
MetadataStoreConfig.builder().build());
+        resources = new ScalableTopicResources(store, 30);
+        reporter = new SegmentLoadReporter(resources, THRESHOLD);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        if (store != null) {
+            store.close();
+        }
+    }
+
+    private static SegmentLoadStats stats(double msgIn) {
+        return new SegmentLoadStats(msgIn, 0, 0, 0);
+    }
+
+    // --- isMaterialChange (pure) ---
+
+    @Test
+    public void testMaterialChangeRelativeThreshold() {
+        // +30% on msgRateIn exceeds the 25% threshold.
+        assertTrue(SegmentLoadReporter.isMaterialChange(stats(1000), 
stats(1300), THRESHOLD));
+        // +10% does not.
+        assertFalse(SegmentLoadReporter.isMaterialChange(stats(1000), 
stats(1100), THRESHOLD));
+        // -30% (drop) is symmetric and material.
+        assertTrue(SegmentLoadReporter.isMaterialChange(stats(1000), 
stats(700), THRESHOLD));
+    }
+
+    @Test
+    public void testMaterialChangeCrossingZero() {
+        assertTrue(SegmentLoadReporter.isMaterialChange(stats(0), stats(1), 
THRESHOLD));
+        assertTrue(SegmentLoadReporter.isMaterialChange(stats(1), stats(0), 
THRESHOLD));
+        assertFalse(SegmentLoadReporter.isMaterialChange(stats(0), stats(0), 
THRESHOLD));
+    }
+
+    @Test
+    public void testMaterialChangeAnyMetric() {
+        SegmentLoadStats last = new SegmentLoadStats(1000, 1000, 1000, 1000);
+        // Only bytesRateOut moves materially; still counts.
+        SegmentLoadStats current = new SegmentLoadStats(1000, 1000, 1000, 
2000);
+        assertTrue(SegmentLoadReporter.isMaterialChange(last, current, 
THRESHOLD));
+    }
+
+    // --- reportIfChanged (against an in-memory store) ---
+
+    @Test
+    public void testFirstReportAlwaysWrites() throws Exception {
+        assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(500)).get());
+        Optional<CacheGetResult<SegmentLoadStats>> got =
+                resources.getSegmentLoadAsync(TOPIC, 0).get();
+        assertTrue(got.isPresent());
+        assertEquals(got.get().getValue().msgRateIn(), 500.0);
+    }
+
+    @Test
+    public void testImmaterialSecondReportSkipped() throws Exception {
+        assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1000)).get());
+        long firstModified = resources.getSegmentLoadAsync(TOPIC, 0).get()
+                .get().getStat().getModificationTimestamp();
+
+        // +10% is immaterial → no write → stored value and timestamp 
unchanged.
+        assertFalse(reporter.reportIfChanged(TOPIC, 0, stats(1100)).get());
+        CacheGetResult<SegmentLoadStats> after = 
resources.getSegmentLoadAsync(TOPIC, 0).get().get();
+        assertEquals(after.getValue().msgRateIn(), 1000.0);
+        assertEquals(after.getStat().getModificationTimestamp(), 
firstModified);
+    }
+
+    @Test
+    public void testMaterialSecondReportWrites() throws Exception {
+        assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1000)).get());
+        // +50% is material → write.
+        assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1500)).get());
+        assertEquals(resources.getSegmentLoadAsync(TOPIC, 
0).get().get().getValue().msgRateIn(),
+                1500.0);
+    }
+
+    @Test
+    public void testForgetReSeedsBaselineFromStore() throws Exception {
+        assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1000)).get());
+        // Without forget, an immaterial sample is skipped.
+        assertFalse(reporter.reportIfChanged(TOPIC, 0, stats(1050)).get());
+        // After forget (unload + re-acquire), the baseline re-seeds from the 
stored record:
+        // an immaterial sample is still skipped (so the merge window isn't 
reset)...
+        reporter.forget(TOPIC, 0);
+        assertFalse(reporter.reportIfChanged(TOPIC, 0, stats(1050)).get());
+        // ...while a material one writes.
+        assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(2000)).get());
+    }
+
+    @Test
+    public void testNewOwnerSeedsBaselineFromStore() throws Exception {
+        // Old owner writes a record.
+        assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1000)).get());
+        long modified = resources.getSegmentLoadAsync(TOPIC, 0).get()
+                .get().getStat().getModificationTimestamp();
+
+        // Ownership moves: a fresh reporter (empty lastWritten cache) samples 
a rate within
+        // the materiality band of the STORED value. It must seed its baseline 
from the store
+        // and skip the write — otherwise every rebalance would reset the 
record's
+        // modification time and starve the controller's merge window.
+        SegmentLoadReporter newOwner = new SegmentLoadReporter(resources, 
THRESHOLD);
+        assertFalse(newOwner.reportIfChanged(TOPIC, 0, stats(1100)).get());
+        assertEquals(resources.getSegmentLoadAsync(TOPIC, 0).get()
+                .get().getStat().getModificationTimestamp(), modified);
+
+        // A materially different sample still writes.
+        assertTrue(newOwner.reportIfChanged(TOPIC, 0, stats(2000)).get());
+    }
+
+    @Test
+    public void testIdenticalValueDoesNotBumpModificationTime() throws 
Exception {
+        assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1000)).get());
+        long modified = resources.getSegmentLoadAsync(TOPIC, 0).get()
+                .get().getStat().getModificationTimestamp();
+
+        // Bit-identical re-report through the resources layer is a no-op 
write.
+        resources.reportSegmentLoadAsync(TOPIC, 0, stats(1000)).get();
+        assertEquals(resources.getSegmentLoadAsync(TOPIC, 0).get()
+                .get().getStat().getModificationTimestamp(), modified);
+    }
+
+    @Test
+    public void testDeleteSegmentLoadToleratesMissing() throws Exception {
+        // No record yet — delete must not fail.
+        resources.deleteSegmentLoadAsync(TOPIC, 7).get();
+        reporter.reportIfChanged(TOPIC, 7, stats(100)).get();
+        resources.deleteSegmentLoadAsync(TOPIC, 7).get();
+        assertFalse(resources.getSegmentLoadAsync(TOPIC, 7).get().isPresent());
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentLoadReporterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentLoadReporterTest.java
new file mode 100644
index 00000000000..086d82fcce7
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentLoadReporterTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end coverage for the broker-side segment load reporter sweep 
(PIP-483): after a
+ * scalable topic has live segment topics on the broker, the periodic sweep 
(driven manually
+ * here for determinism) must write a {@link 
org.apache.pulsar.common.scalable.SegmentLoadStats}
+ * record to the metadata store for each hosted segment, which is what the 
controller's auto
+ * split/merge reads.
+ */
+public class V5SegmentLoadReporterTest extends V5ClientBaseTest {
+
+    @Test
+    public void testSweepWritesSegmentLoadRecords() throws Exception {
+        String topic = newScalableTopic(2);
+
+        @Cleanup
+        Producer<byte[]> producer = v5Client.newProducer(Schema.bytes())
+                .topic(topic)
+                .create();
+        // Produce across keys so both initial segments get a live segment 
topic on the broker.
+        for (int i = 0; i < 50; i++) {
+            producer.newMessage().key("k-" + i).value(("v-" + 
i).getBytes()).send();
+        }
+
+        ScalableTopicResources resources =
+                getPulsar().getPulsarResources().getScalableTopicResources();
+        TopicName parent = TopicName.get(topic);
+
+        // Drive the sweep directly rather than waiting for the 10s scheduled 
tick.
+        getPulsar().getBrokerService().runSegmentLoadReportOnceForTest();
+
+        // Both initial segments (0 and 1) should now have a load record.
+        Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
+            getPulsar().getBrokerService().runSegmentLoadReportOnceForTest();
+            assertTrue(resources.getSegmentLoadAsync(parent, 
0).get().isPresent(),
+                    "segment 0 load record must be written by the sweep");
+            assertTrue(resources.getSegmentLoadAsync(parent, 
1).get().isPresent(),
+                    "segment 1 load record must be written by the sweep");
+        });
+    }
+
+    @Test
+    public void testNonSegmentTopicsProduceNoLoadRecords() throws Exception {
+        // A plain persistent topic must not produce any scalable-segment load 
record.
+        @Cleanup
+        org.apache.pulsar.client.api.PulsarClient v4 =
+                org.apache.pulsar.client.api.PulsarClient.builder()
+                        .serviceUrl(getBrokerServiceUrl()).build();
+        String plain = "persistent://" + getNamespace() + "/plain-" + 
System.nanoTime();
+        @Cleanup
+        org.apache.pulsar.client.api.Producer<byte[]> p =
+                v4.newProducer().topic(plain).create();
+        p.send("x".getBytes());
+
+        // Sweep must not throw on non-segment topics (and obviously writes 
nothing for them).
+        getPulsar().getBrokerService().runSegmentLoadReportOnceForTest();
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentLoadStats.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentLoadStats.java
new file mode 100644
index 00000000000..00701568a3e
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentLoadStats.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.scalable;
+
+/**
+ * Per-segment load sample for scalable-topic auto split/merge (PIP-483).
+ *
+ * <p>Written by the broker that owns a segment's {@code segment://} topic, 
directly to the
+ * metadata store under {@code .../segments/{segmentId}/load}, and read by the 
controller
+ * leader's auto-scaling evaluator. To keep write volume bounded, the owning 
broker only
+ * rewrites this record when one of the rates changes by more than a 
significant threshold
+ * (default ±25%) since the last write — see {@code SegmentLoadReporter}.
+ *
+ * <p>The record carries no timestamp of its own: the metadata store exposes 
the record's
+ * last-modified time via its {@code Stat}, and the controller uses that for 
the "cold for
+ * at least mergeWindow" check.
+ *
+ * @param msgRateIn    inbound messages per second (60s rolling average, from 
{@code TopicStats})
+ * @param bytesRateIn  inbound bytes per second
+ * @param msgRateOut   outbound (dispatched) messages per second, summed 
across subscriptions
+ * @param bytesRateOut outbound bytes per second
+ */
+public record SegmentLoadStats(
+        double msgRateIn,
+        double bytesRateIn,
+        double msgRateOut,
+        double bytesRateOut
+) {
+    public static final SegmentLoadStats ZERO = new SegmentLoadStats(0, 0, 0, 
0);
+}

Reply via email to