This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fda82bea79 [improve][broker] PIP-483: namespace + topic auto 
split/merge policy overrides (#26008)
0fda82bea79 is described below

commit 0fda82bea79a4ce4abfec0aa80ca20f11cc386b7
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Jun 12 13:44:32 2026 -0700

    [improve][broker] PIP-483: namespace + topic auto split/merge policy 
overrides (#26008)
---
 .../broker/resources/ScalableTopicMetadata.java    |   7 +
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  32 +++++
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  97 +++++++++++++
 .../pulsar/broker/admin/v2/ScalableTopics.java     | 151 +++++++++++++++++++++
 .../broker/service/scalable/AutoScaleConfig.java   |  81 ++++++++++-
 .../service/scalable/ScalableTopicController.java  |  67 +++++++--
 .../broker/service/scalable/SegmentLayout.java     |   9 +-
 .../admin/ScalableTopicAutoScalePolicyTest.java    | 133 ++++++++++++++++++
 .../service/scalable/AutoScaleConfigTest.java      |  46 +++++++
 .../ScalableTopicControllerAutoScaleTest.java      | 124 +++++++++++++++++
 .../broker/service/scalable/SegmentLayoutTest.java |   7 +-
 .../org/apache/pulsar/client/admin/Namespaces.java |  67 +++++++++
 .../apache/pulsar/client/admin/ScalableTopics.java |  49 +++++++
 .../policies/data/AutoScalePolicyOverride.java     |  90 ++++++++++++
 .../pulsar/common/policies/data/Policies.java      |   5 +-
 .../policies/data/ScalableTopicMetadata.java       |   6 +
 .../policies/data/ScalableTopicMetadataTest.java   |   5 +-
 .../client/admin/internal/NamespacesImpl.java      |  40 ++++++
 .../client/admin/internal/ScalableTopicsImpl.java  |  38 ++++++
 .../pulsar/common/policies/data/PolicyName.java    |   1 +
 20 files changed, 1035 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java
index 8af85231651..228e2c25e65 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java
@@ -24,6 +24,7 @@ import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
 import org.apache.pulsar.common.scalable.SegmentInfo;
 
 /**
@@ -51,4 +52,10 @@ public class ScalableTopicMetadata {
     /** User-defined topic properties. */
     @Builder.Default
     private Map<String, String> properties = Map.of();
+
+    /**
+     * Per-topic auto split/merge policy override (PIP-483). {@code null} 
means no override:
+     * the namespace policy and then the broker configuration apply.
+     */
+    private AutoScalePolicyOverride autoScalePolicy;
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index bff682e0d1b..911d3de2b04 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -65,6 +65,7 @@ import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.scalable.AutoScaleConfig;
 import org.apache.pulsar.broker.topiclistlimit.TopicListMemoryLimiter;
 import org.apache.pulsar.broker.topiclistlimit.TopicListSizeResultCache;
 import org.apache.pulsar.broker.web.RestException;
@@ -83,6 +84,7 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
 import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -1307,6 +1309,36 @@ public abstract class NamespacesBase extends 
AdminResource {
                 }));
     }
 
+    protected CompletableFuture<AutoScalePolicyOverride> 
internalGetScalableTopicAutoScalePolicyAsync() {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.SCALABLE_TOPIC_AUTO_SCALE,
+                PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenApply(policies -> policies.scalableTopicAutoScalePolicy);
+    }
+
+    protected CompletableFuture<Void> 
internalSetScalableTopicAutoScalePolicyAsync(
+            AutoScalePolicyOverride override) {
+        return validateNamespacePolicyOperationAsync(namespaceName,
+                PolicyName.SCALABLE_TOPIC_AUTO_SCALE, PolicyOperation.WRITE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenAccept(__ -> {
+                    if (override != null) {
+                        // The override only has to be valid in combination 
with the broker
+                        // defaults — resolve and let the invariant checks 
reject bad values
+                        // (e.g. zero split thresholds, split <= merge 
hysteresis inversion).
+                        try {
+                            AutoScaleConfig.resolve(pulsar().getConfig(), 
override, null);
+                        } catch (IllegalArgumentException e) {
+                            throw new 
RestException(Status.PRECONDITION_FAILED, e.getMessage());
+                        }
+                    }
+                })
+                .thenCompose(__ -> 
namespaceResources().setPoliciesAsync(namespaceName, policies -> {
+                    policies.scalableTopicAutoScalePolicy = override;
+                    return policies;
+                }));
+    }
+
     protected CompletableFuture<Void> 
internalSetAutoSubscriptionCreationAsync(AutoSubscriptionCreationOverride
                                                                
autoSubscriptionCreationOverride) {
         // Force to read the data s.t. the watch to the cache content is setup.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 77c18f4169d..33f4eb76858 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -60,6 +60,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
 import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -880,6 +881,102 @@ public class Namespaces extends NamespacesBase {
                 });
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/scalableTopicAutoScalePolicy")
+    @Operation(summary = "Get the scalable-topic auto split/merge policy 
override for a namespace")
+    @ApiResponses(value = {
+            @ApiResponse(responseCode = "200",
+                    description = "The scalable-topic auto split/merge policy 
override for the namespace",
+                    content = @Content(schema = @Schema(implementation = 
AutoScalePolicyOverride.class))),
+            @ApiResponse(responseCode = "204", description = "No override is 
set on this namespace"),
+            @ApiResponse(responseCode = "403", description = "Don't have admin 
permission"),
+            @ApiResponse(responseCode = "404", description = "Tenant or 
namespace doesn't exist")})
+    public void getScalableTopicAutoScalePolicy(@Suspended AsyncResponse 
asyncResponse,
+                                                @PathParam("tenant") String 
tenant,
+                                                @PathParam("namespace") String 
namespace) {
+        validateNamespaceName(tenant, namespace);
+        internalGetScalableTopicAutoScalePolicyAsync()
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error()
+                            .attr("namespace", namespaceName)
+                            .exception(ex)
+                            .log("Failed to get scalableTopicAutoScalePolicy 
for namespace");
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/scalableTopicAutoScalePolicy")
+    @Operation(summary = "Override the broker's scalable-topic auto 
split/merge settings for a namespace")
+    @ApiResponses(value = {
+            @ApiResponse(responseCode = "204", description = "Operation 
successful"),
+            @ApiResponse(responseCode = "403", description = "Don't have admin 
permission"),
+            @ApiResponse(responseCode = "404", description = "Tenant or 
cluster or namespace doesn't exist"),
+            @ApiResponse(responseCode = "412",
+                    description = "The resolved auto split/merge policy 
violates an invariant")})
+    public void setScalableTopicAutoScalePolicy(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant, @PathParam("namespace") String 
namespace,
+            @RequestBody(description = "Auto split/merge policy override", 
required = true)
+                    AutoScalePolicyOverride override) {
+        validateNamespaceName(tenant, namespace);
+        internalSetScalableTopicAutoScalePolicyAsync(override)
+                .thenAccept(__ -> {
+                    log.info()
+                            .attr("namespace", namespaceName)
+                            .log("Successfully set 
scalableTopicAutoScalePolicy on namespace");
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(e -> {
+                    Throwable ex = FutureUtil.unwrapCompletionException(e);
+                    log.error()
+                            .attr("namespace", namespaceName)
+                            .exception(ex)
+                            .log("Failed to set scalableTopicAutoScalePolicy 
on namespace");
+                    if (ex instanceof 
MetadataStoreException.NotFoundException) {
+                        asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    }
+                    return null;
+                });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/scalableTopicAutoScalePolicy")
+    @Operation(summary = "Remove the scalable-topic auto split/merge policy 
override from a namespace")
+    @ApiResponses(value = {
+            @ApiResponse(responseCode = "204", description = "Operation 
successful"),
+            @ApiResponse(responseCode = "403", description = "Don't have admin 
permission"),
+            @ApiResponse(responseCode = "404", description = "Tenant or 
cluster or namespace doesn't exist") })
+    public void removeScalableTopicAutoScalePolicy(@Suspended final 
AsyncResponse asyncResponse,
+                                                   @PathParam("tenant") String 
tenant,
+                                                   @PathParam("namespace") 
String namespace) {
+        validateNamespaceName(tenant, namespace);
+        internalSetScalableTopicAutoScalePolicyAsync(null)
+                .thenAccept(__ -> {
+                    log.info()
+                            .attr("namespace", namespaceName)
+                            .log("Successfully removed 
scalableTopicAutoScalePolicy on namespace");
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(e -> {
+                    Throwable ex = FutureUtil.unwrapCompletionException(e);
+                    log.error()
+                            .attr("namespace", namespaceName)
+                            .exception(ex)
+                            .log("Failed to remove 
scalableTopicAutoScalePolicy on namespace");
+                    if (ex instanceof 
MetadataStoreException.NotFoundException) {
+                        asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    }
+                    return null;
+                });
+    }
+
     @POST
     @Path("/{tenant}/{namespace}/autoSubscriptionCreation")
     @Operation(summary = "Override broker's allowAutoSubscriptionCreation 
setting for a namespace")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index 13afb2abe2e..5a365ffa523 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -55,13 +55,17 @@ import lombok.CustomLog;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
 import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.scalable.AutoScaleConfig;
 import org.apache.pulsar.broker.service.scalable.ScalableTopicController;
 import org.apache.pulsar.broker.service.scalable.ScalableTopicService;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.scalable.ScalableTopicConstants;
@@ -477,6 +481,153 @@ public class ScalableTopics extends AdminResource {
                 });
     }
 
+    // --- Auto split/merge policy override (PIP-483) ---
+
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/autoScalePolicy")
+    @Operation(summary = "Get the per-topic auto split/merge policy override.")
+    @ApiResponses(value = {
+            @ApiResponse(responseCode = "200", description = "The per-topic 
auto split/merge policy override.",
+                    content = @Content(schema = @Schema(implementation = 
AutoScalePolicyOverride.class))),
+            @ApiResponse(responseCode = "204", description = "No override is 
set on this topic"),
+            @ApiResponse(responseCode = "401",
+                    description = "Don't have permission to administrate 
resources on this tenant"),
+            @ApiResponse(responseCode = "403", description = "Don't have admin 
permission on the namespace"),
+            @ApiResponse(responseCode = "404", description = "Scalable topic 
doesn't exist"),
+            @ApiResponse(responseCode = "500", description = "Internal server 
error")})
+    public void getAutoScalePolicy(
+            @Suspended final AsyncResponse asyncResponse,
+            @Parameter(description = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @Parameter(description = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @Parameter(description = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic) {
+        validateNamespaceName(tenant, namespace);
+        TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, 
encodedTopic);
+
+        validateTopicPolicyOperationAsync(tn, 
PolicyName.SCALABLE_TOPIC_AUTO_SCALE, PolicyOperation.READ)
+                .thenCompose(__ -> 
resources().getScalableTopicMetadataAsync(tn))
+                .thenAccept(optMd -> {
+                    if (optMd.isEmpty()) {
+                        asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND,
+                                "Scalable topic not found: " + tn));
+                    } else {
+                        asyncResponse.resume(optMd.get().getAutoScalePolicy());
+                    }
+                })
+                .exceptionally(ex -> {
+                    log.error().attr("clientAppId", 
clientAppId()).attr("topic", tn)
+                            .exception(ex).log("Failed to get autoScalePolicy 
for scalable topic");
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/autoScalePolicy")
+    @Operation(summary = "Set the per-topic auto split/merge policy override.")
+    @ApiResponses(value = {
+            @ApiResponse(responseCode = "204", description = "Override set 
successfully"),
+            @ApiResponse(responseCode = "401",
+                    description = "Don't have permission to administrate 
resources on this tenant"),
+            @ApiResponse(responseCode = "403", description = "Don't have admin 
permission on the namespace"),
+            @ApiResponse(responseCode = "404", description = "Scalable topic 
doesn't exist"),
+            @ApiResponse(responseCode = "412",
+                    description = "The resolved auto split/merge policy 
violates an invariant"),
+            @ApiResponse(responseCode = "500", description = "Internal server 
error")})
+    public void setAutoScalePolicy(
+            @Suspended final AsyncResponse asyncResponse,
+            @Parameter(description = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @Parameter(description = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @Parameter(description = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @RequestBody(description = "Auto split/merge policy override", 
required = true)
+            AutoScalePolicyOverride override) {
+        validateNamespaceName(tenant, namespace);
+        TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, 
encodedTopic);
+        internalSetAutoScalePolicy(asyncResponse, tn, override);
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/autoScalePolicy")
+    @Operation(summary = "Remove the per-topic auto split/merge policy 
override.")
+    @ApiResponses(value = {
+            @ApiResponse(responseCode = "204", description = "Override removed 
successfully"),
+            @ApiResponse(responseCode = "401",
+                    description = "Don't have permission to administrate 
resources on this tenant"),
+            @ApiResponse(responseCode = "403", description = "Don't have admin 
permission on the namespace"),
+            @ApiResponse(responseCode = "404", description = "Scalable topic 
doesn't exist"),
+            @ApiResponse(responseCode = "500", description = "Internal server 
error")})
+    public void removeAutoScalePolicy(
+            @Suspended final AsyncResponse asyncResponse,
+            @Parameter(description = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @Parameter(description = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @Parameter(description = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic) {
+        validateNamespaceName(tenant, namespace);
+        TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, 
encodedTopic);
+        internalSetAutoScalePolicy(asyncResponse, tn, null);
+    }
+
+    private void internalSetAutoScalePolicy(AsyncResponse asyncResponse, 
TopicName tn,
+                                            AutoScalePolicyOverride override) {
+        validateTopicPolicyOperationAsync(tn, 
PolicyName.SCALABLE_TOPIC_AUTO_SCALE, PolicyOperation.WRITE)
+                .thenCompose(__ -> {
+                    if (override == null) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    // Validate the override in combination with the layers it 
will actually
+                    // be resolved with: the broker defaults AND the current 
namespace
+                    // override — two layers that are each valid against the 
defaults can
+                    // still combine into an invalid policy (e.g. the 
namespace raises a
+                    // merge threshold and the topic lowers the matching split 
threshold).
+                    // This check is best-effort: the namespace override can 
still change
+                    // afterwards, and broker defaults can differ across 
restarts — the
+                    // controller handles a combination that has become 
invalid by falling
+                    // back to disabled (see 
ScalableTopicController.resolveAutoScaleConfig).
+                    return 
pulsar().getPulsarResources().getNamespaceResources()
+                            .getPoliciesAsync(namespaceName)
+                            .thenAccept(optPolicies -> {
+                                AutoScalePolicyOverride nsOverride = 
optPolicies
+                                        .map(p -> 
p.scalableTopicAutoScalePolicy)
+                                        .orElse(null);
+                                try {
+                                    
AutoScaleConfig.resolve(pulsar().getConfig(), nsOverride, override);
+                                } catch (IllegalArgumentException e) {
+                                    throw new 
RestException(Response.Status.PRECONDITION_FAILED,
+                                            e.getMessage());
+                                }
+                            });
+                })
+                .thenCompose(__ -> resources().updateScalableTopicAsync(tn, md 
-> {
+                    md.setAutoScalePolicy(override);
+                    return md;
+                }))
+                .thenAccept(__ -> {
+                    log.info().attr("clientAppId", 
clientAppId()).attr("topic", tn)
+                            .attr("removed", override == null)
+                            .log("Updated autoScalePolicy on scalable topic");
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(e -> {
+                    Throwable ex = FutureUtil.unwrapCompletionException(e);
+                    if (ex instanceof 
MetadataStoreException.NotFoundException) {
+                        asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND,
+                                "Scalable topic not found: " + tn));
+                        return null;
+                    }
+                    log.error().attr("clientAppId", 
clientAppId()).attr("topic", tn)
+                            .exception(ex).log("Failed to update 
autoScalePolicy for scalable topic");
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
     // --- Delete ---
 
     @DELETE
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java
index b739b102a76..848b59cfa14 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.scalable;
 import java.time.Duration;
 import lombok.Builder;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
 
 /**
  * Fully-resolved auto split/merge policy for a single scalable topic 
(PIP-483).
@@ -81,6 +82,30 @@ public record AutoScaleConfig(
      * @return the resolved policy reflecting the {@code scalableTopic*} 
settings
      */
     public static AutoScaleConfig fromBrokerConfig(ServiceConfiguration conf) {
+        return brokerDefaults(conf).validated();
+    }
+
+    /**
+     * Resolve the effective policy for a topic: broker defaults, overlaid 
with the namespace
+     * override, overlaid with the topic override (most-specific wins per 
field; {@code null}
+     * override fields fall through).
+     *
+     * @param conf              the broker configuration (cluster-wide 
defaults)
+     * @param namespaceOverride the namespace-level override, or {@code null} 
if none
+     * @param topicOverride     the topic-level override, or {@code null} if 
none
+     * @return the validated effective policy
+     * @throws IllegalArgumentException if the resolved policy violates an 
invariant
+     */
+    public static AutoScaleConfig resolve(ServiceConfiguration conf,
+                                          AutoScalePolicyOverride 
namespaceOverride,
+                                          AutoScalePolicyOverride 
topicOverride) {
+        AutoScaleConfig config = brokerDefaults(conf);
+        config = applyOverride(config, namespaceOverride);
+        config = applyOverride(config, topicOverride);
+        return config.validated();
+    }
+
+    private static AutoScaleConfig brokerDefaults(ServiceConfiguration conf) {
         return AutoScaleConfig.builder()
                 .enabled(conf.isScalableTopicAutoScaleEnabled())
                 .maxSegments(conf.getScalableTopicMaxSegments())
@@ -97,8 +122,60 @@ public record AutoScaleConfig(
                 
.mergeBytesRateIn(conf.getScalableTopicMergeBytesRateInThreshold())
                 
.mergeMsgRateOut(conf.getScalableTopicMergeMsgRateOutThreshold())
                 
.mergeBytesRateOut(conf.getScalableTopicMergeBytesRateOutThreshold())
-                .build()
-                .validated();
+                .build();
+    }
+
+    private static AutoScaleConfig applyOverride(AutoScaleConfig base, 
AutoScalePolicyOverride o) {
+        if (o == null) {
+            return base;
+        }
+        AutoScaleConfigBuilder b = base.toBuilder();
+        if (o.getEnabled() != null) {
+            b.enabled(o.getEnabled());
+        }
+        if (o.getMaxSegments() != null) {
+            b.maxSegments(o.getMaxSegments());
+        }
+        if (o.getMinSegments() != null) {
+            b.minSegments(o.getMinSegments());
+        }
+        if (o.getMaxDagDepth() != null) {
+            b.maxDagDepth(o.getMaxDagDepth());
+        }
+        if (o.getSplitCooldownSeconds() != null) {
+            b.splitCooldown(Duration.ofSeconds(o.getSplitCooldownSeconds()));
+        }
+        if (o.getMergeCooldownSeconds() != null) {
+            b.mergeCooldown(Duration.ofSeconds(o.getMergeCooldownSeconds()));
+        }
+        if (o.getMergeWindowSeconds() != null) {
+            b.mergeWindow(Duration.ofSeconds(o.getMergeWindowSeconds()));
+        }
+        if (o.getSplitMsgRateInThreshold() != null) {
+            b.splitMsgRateIn(o.getSplitMsgRateInThreshold());
+        }
+        if (o.getSplitBytesRateInThreshold() != null) {
+            b.splitBytesRateIn(o.getSplitBytesRateInThreshold());
+        }
+        if (o.getSplitMsgRateOutThreshold() != null) {
+            b.splitMsgRateOut(o.getSplitMsgRateOutThreshold());
+        }
+        if (o.getSplitBytesRateOutThreshold() != null) {
+            b.splitBytesRateOut(o.getSplitBytesRateOutThreshold());
+        }
+        if (o.getMergeMsgRateInThreshold() != null) {
+            b.mergeMsgRateIn(o.getMergeMsgRateInThreshold());
+        }
+        if (o.getMergeBytesRateInThreshold() != null) {
+            b.mergeBytesRateIn(o.getMergeBytesRateInThreshold());
+        }
+        if (o.getMergeMsgRateOutThreshold() != null) {
+            b.mergeMsgRateOut(o.getMergeMsgRateOutThreshold());
+        }
+        if (o.getMergeBytesRateOutThreshold() != null) {
+            b.mergeBytesRateOut(o.getMergeBytesRateOutThreshold());
+        }
+        return b.build();
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index 0b8c70d66b1..a85187df28c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.broker.service.TransportCnx;
 import org.apache.pulsar.common.api.proto.ScalableConsumerType;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.scalable.HashRange;
@@ -359,10 +360,6 @@ public class ScalableTopicController {
         if (brokerConfig == null) {
             return CompletableFuture.completedFuture(null);
         }
-        AutoScaleConfig config = 
AutoScaleConfig.fromBrokerConfig(brokerConfig);
-        if (!config.enabled()) {
-            return CompletableFuture.completedFuture(null);
-        }
         if (!autoScaleInFlight.compareAndSet(false, true)) {
             // Another evaluation or auto operation is already running. Don't 
drop the
             // trigger: mark it pending so the in-flight run re-evaluates on 
completion —
@@ -372,11 +369,18 @@ public class ScalableTopicController {
             return CompletableFuture.completedFuture(null);
         }
         try {
-            return collectConsumerCounts()
-                    .thenCombine(collectLoadSamples(), (consumers, load) ->
-                            AutoScalePolicyEvaluator.decide(currentLayout, 
load, consumers, config,
-                                    clock.millis(), lastSplitAtMs, 
lastMergeAtMs))
-                    .thenCompose(decision -> dispatch(decision, config, 
trigger))
+            return resolveAutoScaleConfig(brokerConfig)
+                    .thenCompose(config -> {
+                        if (!config.enabled()) {
+                            return 
CompletableFuture.<Void>completedFuture(null);
+                        }
+                        return collectConsumerCounts()
+                                .thenCombine(collectLoadSamples(), (consumers, 
load) ->
+                                        
AutoScalePolicyEvaluator.decide(currentLayout, load,
+                                                consumers, config, 
clock.millis(),
+                                                lastSplitAtMs, lastMergeAtMs))
+                                .thenCompose(decision -> dispatch(decision, 
config, trigger));
+                    })
                     .whenComplete((__, ex) -> {
                         autoScaleInFlight.set(false);
                         if (autoScaleReEvaluate.getAndSet(false)) {
@@ -393,6 +397,45 @@ public class ScalableTopicController {
         }
     }
 
+    /**
+     * Resolve the effective auto split/merge policy for this topic: broker 
defaults overlaid
+     * with the namespace-level override ({@code 
Policies.scalableTopicAutoScalePolicy}) and
+     * then the per-topic override ({@code 
ScalableTopicMetadata.autoScalePolicy}). Both reads
+     * are metadata-cache-backed, so this is cheap per evaluation and override 
changes take
+     * effect on the next tick without controller restarts.
+     *
+     * <p>Set-time validation is best-effort only (the namespace override can 
change after a
+     * topic override was validated against it, and broker defaults can change 
across
+     * restarts), so the stored combination can be invalid here. In that case 
auto split/merge
+     * is treated as <b>disabled</b> for the topic — predictable, and loudly 
logged on every
+     * evaluation until an operator fixes the overrides — rather than failing 
the evaluation
+     * chain.
+     */
+    private CompletableFuture<AutoScaleConfig> resolveAutoScaleConfig(
+            ServiceConfiguration brokerConfig) {
+        CompletableFuture<AutoScalePolicyOverride> namespaceOverride =
+                
brokerService.getPulsar().getPulsarResources().getNamespaceResources()
+                        .getPoliciesAsync(topicName.getNamespaceObject())
+                        .thenApply(opt -> opt.map(p -> 
p.scalableTopicAutoScalePolicy)
+                                .orElse(null));
+        CompletableFuture<AutoScalePolicyOverride> topicOverride =
+                resources.getScalableTopicMetadataAsync(topicName)
+                        .thenApply(opt -> 
opt.map(ScalableTopicMetadata::getAutoScalePolicy)
+                                .orElse(null));
+        return namespaceOverride.thenCombine(topicOverride, (ns, topic) -> {
+            try {
+                return AutoScaleConfig.resolve(brokerConfig, ns, topic);
+            } catch (IllegalArgumentException e) {
+                log.warn().attr("reason", e.getMessage())
+                        .log("Resolved auto split/merge policy is invalid; 
treating auto "
+                                + "split/merge as disabled for this topic 
until the namespace "
+                                + "or topic override is fixed");
+                return AutoScaleConfig.fromBrokerConfig(brokerConfig)
+                        .toBuilder().enabled(false).build();
+            }
+        });
+    }
+
     private CompletableFuture<Void> dispatch(AutoScaleDecision decision, 
AutoScaleConfig config,
                                              String trigger) {
         if (decision instanceof AutoScaleDecision.Split split) {
@@ -657,7 +700,7 @@ public class ScalableTopicController {
           .thenCompose(__ -> resources.updateScalableTopicAsync(topicName, md 
-> {
               SegmentLayout latest = SegmentLayout.fromMetadata(md);
               SegmentLayout updated = latest.splitSegment(segmentId, nowMs);
-              return updated.toMetadata(md.getProperties());
+              return updated.toMetadata(md);
           }))
           .thenCompose(__ -> 
resources.getScalableTopicMetadataAsync(topicName, true))
           .thenCompose(optMd -> {
@@ -706,7 +749,7 @@ public class ScalableTopicController {
           .thenCompose(__ -> resources.updateScalableTopicAsync(topicName, md 
-> {
               SegmentLayout latest = SegmentLayout.fromMetadata(md);
               SegmentLayout updated = latest.mergeSegments(segmentId1, 
segmentId2, nowMs);
-              return updated.toMetadata(md.getProperties());
+              return updated.toMetadata(md);
           }))
           .thenCompose(__ -> 
resources.getScalableTopicMetadataAsync(topicName, true))
           .thenCompose(optMd -> {
@@ -1192,7 +1235,7 @@ public class ScalableTopicController {
                     updated = updated.pruneSegment(s.segmentId());
                 }
             }
-            return updated == latest ? md : 
updated.toMetadata(md.getProperties());
+            return updated == latest ? md : updated.toMetadata(md);
         }).thenCompose(__ -> 
resources.getScalableTopicMetadataAsync(topicName, true))
           .thenCompose(optMd -> {
               currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
index a3f1f0c5ef1..c823fb75e91 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
@@ -276,14 +276,17 @@ public class SegmentLayout {
     }
 
     /**
-     * Convert back to metadata for persistence.
+     * Convert back to metadata for persistence, carrying over the non-layout 
fields
+     * (properties, per-topic auto-scale policy) from the record being 
replaced. Layout
+     * mutations must never silently drop fields they don't model.
      */
-    public ScalableTopicMetadata toMetadata(Map<String, String> properties) {
+    public ScalableTopicMetadata toMetadata(ScalableTopicMetadata original) {
         return ScalableTopicMetadata.builder()
                 .epoch(epoch)
                 .nextSegmentId(nextSegmentId)
                 .segments(new LinkedHashMap<>(allSegments))
-                .properties(properties)
+                .properties(original.getProperties())
+                .autoScalePolicy(original.getAutoScalePolicy())
                 .build();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicAutoScalePolicyTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicAutoScalePolicyTest.java
new file mode 100644
index 00000000000..85c65f9b9ec
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicAutoScalePolicyTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
+import java.util.UUID;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end coverage for the auto split/merge policy override admin API 
(PIP-483), at both
+ * levels, through the full HTTP path against a real shared broker.
+ *
+ * <p>Endpoints under test:
+ *
+ * <ul>
+ *   <li>namespace: {@code 
/admin/v2/namespaces/{tenant}/{ns}/scalableTopicAutoScalePolicy}</li>
+ *   <li>topic: {@code 
/admin/v2/scalable/{tenant}/{ns}/{topic}/autoScalePolicy}</li>
+ * </ul>
+ */
+public class ScalableTopicAutoScalePolicyTest extends SharedPulsarBaseTest {
+
+    private String newScalableTopic() throws Exception {
+        String topic = "topic://" + getNamespace() + "/autoscale-"
+                + UUID.randomUUID().toString().substring(0, 8);
+        admin.scalableTopics().createScalableTopic(topic, 1);
+        return topic;
+    }
+
+    @Test
+    public void testTopicLevelRoundTrip() throws Exception {
+        String topic = newScalableTopic();
+
+        // No override initially.
+        assertNull(admin.scalableTopics().getAutoScalePolicy(topic));
+
+        AutoScalePolicyOverride override = AutoScalePolicyOverride.builder()
+                .enabled(true)
+                .maxSegments(8)
+                .splitMsgRateInThreshold(5_000.0)
+                .build();
+        admin.scalableTopics().setAutoScalePolicy(topic, override);
+        assertEquals(admin.scalableTopics().getAutoScalePolicy(topic), 
override);
+
+        admin.scalableTopics().removeAutoScalePolicy(topic);
+        assertNull(admin.scalableTopics().getAutoScalePolicy(topic));
+    }
+
+    @Test
+    public void testNamespaceLevelRoundTrip() throws Exception {
+        String namespace = getNamespace();
+
+        
assertNull(admin.namespaces().getScalableTopicAutoScalePolicy(namespace));
+
+        AutoScalePolicyOverride override = AutoScalePolicyOverride.builder()
+                .enabled(false)
+                .build();
+        admin.namespaces().setScalableTopicAutoScalePolicy(namespace, 
override);
+        
assertEquals(admin.namespaces().getScalableTopicAutoScalePolicy(namespace), 
override);
+
+        admin.namespaces().removeScalableTopicAutoScalePolicy(namespace);
+        
assertNull(admin.namespaces().getScalableTopicAutoScalePolicy(namespace));
+    }
+
+    @Test
+    public void testInvalidOverrideRejected() throws Exception {
+        String topic = newScalableTopic();
+
+        // minSegments above the broker-default maxSegments breaks min <= max 
on resolution.
+        AutoScalePolicyOverride bad = AutoScalePolicyOverride.builder()
+                .minSegments(1_000_000)
+                .build();
+        assertThrows(PulsarAdminException.PreconditionFailedException.class,
+                () -> admin.scalableTopics().setAutoScalePolicy(topic, bad));
+        assertThrows(PulsarAdminException.PreconditionFailedException.class,
+                () -> 
admin.namespaces().setScalableTopicAutoScalePolicy(getNamespace(), bad));
+    }
+
+    @Test
+    public void testTopicOverrideValidatedAgainstNamespaceOverride() throws 
Exception {
+        // Each layer is valid against the broker defaults, but combined they 
invert the
+        // hysteresis invariant: ns raises mergeMsgRateIn to 5000, topic lowers
+        // splitMsgRateIn to 2000 → split <= merge. The topic-level set must 
see the
+        // current namespace override and reject with 412.
+        String namespace = getNamespace();
+        String topic = newScalableTopic();
+        try {
+            admin.namespaces().setScalableTopicAutoScalePolicy(namespace,
+                    
AutoScalePolicyOverride.builder().mergeMsgRateInThreshold(5_000.0).build());
+
+            
assertThrows(PulsarAdminException.PreconditionFailedException.class,
+                    () -> admin.scalableTopics().setAutoScalePolicy(topic,
+                            AutoScalePolicyOverride.builder()
+                                    
.splitMsgRateInThreshold(2_000.0).build()));
+
+            // The same topic override is accepted once the conflicting 
namespace layer
+            // is gone.
+            admin.namespaces().removeScalableTopicAutoScalePolicy(namespace);
+            admin.scalableTopics().setAutoScalePolicy(topic,
+                    
AutoScalePolicyOverride.builder().splitMsgRateInThreshold(2_000.0).build());
+        } finally {
+            admin.namespaces().removeScalableTopicAutoScalePolicy(namespace);
+        }
+    }
+
+    @Test
+    public void testTopicLevelOnMissingTopicIs404() {
+        String missing = "topic://" + getNamespace() + "/does-not-exist";
+        assertThrows(PulsarAdminException.NotFoundException.class,
+                () -> admin.scalableTopics().setAutoScalePolicy(missing,
+                        
AutoScalePolicyOverride.builder().enabled(false).build()));
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java
index acc104c2773..7a991834461 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java
@@ -55,6 +55,52 @@ public class AutoScaleConfigTest {
         assertTrue(c.splitBytesRateOut() > c.mergeBytesRateOut());
     }
 
+    @Test
+    public void testResolveLayersOverrides() {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        org.apache.pulsar.common.policies.data.AutoScalePolicyOverride ns =
+                
org.apache.pulsar.common.policies.data.AutoScalePolicyOverride.builder()
+                        .maxSegments(16)
+                        .splitMsgRateInThreshold(20_000.0)
+                        .build();
+        org.apache.pulsar.common.policies.data.AutoScalePolicyOverride topic =
+                
org.apache.pulsar.common.policies.data.AutoScalePolicyOverride.builder()
+                        .maxSegments(4)
+                        .splitCooldownSeconds(5L)
+                        .build();
+
+        AutoScaleConfig c = AutoScaleConfig.resolve(conf, ns, topic);
+        // Topic wins where both set.
+        assertEquals(c.maxSegments(), 4);
+        // Namespace applies where the topic is silent.
+        assertEquals(c.splitMsgRateIn(), 20_000.0);
+        // Topic-only field applies.
+        assertEquals(c.splitCooldown(), Duration.ofSeconds(5));
+        // Untouched fields fall through to the broker defaults.
+        assertEquals(c.mergeCooldown(), Duration.ofMinutes(5));
+        assertTrue(c.enabled());
+    }
+
+    @Test
+    public void testResolveNullOverridesEqualsBrokerConfig() {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        assertEquals(AutoScaleConfig.resolve(conf, null, null),
+                AutoScaleConfig.fromBrokerConfig(conf));
+    }
+
+    @Test
+    public void testResolveRejectsInvalidCombination() {
+        // The override is only invalid in combination: a merge threshold 
raised above the
+        // broker-default split threshold breaks the hysteresis invariant.
+        ServiceConfiguration conf = new ServiceConfiguration();
+        org.apache.pulsar.common.policies.data.AutoScalePolicyOverride bad =
+                
org.apache.pulsar.common.policies.data.AutoScalePolicyOverride.builder()
+                        
.mergeMsgRateInThreshold(conf.getScalableTopicSplitMsgRateInThreshold())
+                        .build();
+        assertThrows(IllegalArgumentException.class,
+                () -> AutoScaleConfig.resolve(conf, null, bad));
+    }
+
     @Test
     public void testValidationRejectsBadConfig() {
         // Zero split threshold: the evaluator scores rate/threshold, so 0 
would yield
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java
index 74ad5e8a5a3..6b1fc907696 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java
@@ -32,6 +32,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.resources.ScalableTopicResources;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.TransportCnx;
@@ -39,7 +41,10 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.ScalableTopics;
 import org.apache.pulsar.client.admin.Topics;
 import org.apache.pulsar.common.api.proto.ScalableConsumerType;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.scalable.SegmentLoadStats;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.coordination.CoordinationService;
@@ -70,6 +75,10 @@ public class ScalableTopicControllerAutoScaleTest {
     private PulsarService pulsar;
     private ServiceConfiguration config;
     private ScalableTopics scalableTopics;
+    private PulsarResources pulsarResources;
+    private NamespaceResources namespaceResources;
+    /** Namespace policies served by the mocked resources; null = namespace 
has no policies. */
+    private Policies namespacePolicies;
     private ScalableTopicController controller;
     private TopicName topicName;
 
@@ -104,6 +113,19 @@ public class ScalableTopicControllerAutoScaleTest {
         when(pulsar.getBrokerId()).thenReturn(BROKER_ID);
         when(pulsar.getExecutor()).thenReturn(scheduler);
         when(pulsar.getConfig()).thenReturn(config);
+
+        // Namespace policies feed the per-namespace auto-scale override 
resolution.
+        // Default: no policies → broker config applies. Tests install 
overrides via
+        // namespacePolicies. Reset explicitly — TestNG reuses the test 
instance.
+        namespacePolicies = null;
+        pulsarResources = mock(PulsarResources.class);
+        namespaceResources = mock(NamespaceResources.class);
+        when(pulsar.getPulsarResources()).thenReturn(pulsarResources);
+        
when(pulsarResources.getNamespaceResources()).thenReturn(namespaceResources);
+        when(namespaceResources.getPoliciesAsync(any(NamespaceName.class)))
+                .thenAnswer(__ -> CompletableFuture.completedFuture(
+                        Optional.ofNullable(namespacePolicies)));
+
         when(pulsar.getAdminClient()).thenReturn(admin);
         when(admin.topics()).thenReturn(topics);
         when(admin.scalableTopics()).thenReturn(scalableTopics);
@@ -205,6 +227,108 @@ public class ScalableTopicControllerAutoScaleTest {
                         "2 consumers on 1 segment should drive a split"));
     }
 
+    @Test
+    public void testNamespaceOverrideDisablesAutoScale() throws Exception {
+        namespacePolicies = new Policies();
+        namespacePolicies.scalableTopicAutoScalePolicy =
+                AutoScalePolicyOverride.builder().enabled(false).build();
+
+        startController(2);
+        resources.reportSegmentLoadAsync(topicName, 0,
+                new SegmentLoadStats(20_000, 0, 0, 0)).get();
+        controller.evaluateAutoScaleForTest().get();
+        assertEquals(activeSegmentCount(), 2,
+                "namespace override enabled=false must suppress the split");
+    }
+
+    @Test
+    public void testTopicOverrideWinsOverNamespace() throws Exception {
+        // Namespace disables auto-scale; the topic explicitly re-enables it.
+        namespacePolicies = new Policies();
+        namespacePolicies.scalableTopicAutoScalePolicy =
+                AutoScalePolicyOverride.builder().enabled(false).build();
+
+        startController(2);
+        resources.updateScalableTopicAsync(topicName, md -> {
+            
md.setAutoScalePolicy(AutoScalePolicyOverride.builder().enabled(true).build());
+            return md;
+        }).get();
+
+        resources.reportSegmentLoadAsync(topicName, 0,
+                new SegmentLoadStats(20_000, 0, 0, 0)).get();
+        controller.evaluateAutoScaleForTest().get();
+        assertEquals(activeSegmentCount(), 3,
+                "topic override enabled=true must win over the namespace 
disable");
+    }
+
+    @Test
+    public void testTopicOverrideMaxSegmentsCapsSplit() throws Exception {
+        startController(2);
+        resources.updateScalableTopicAsync(topicName, md -> {
+            
md.setAutoScalePolicy(AutoScalePolicyOverride.builder().maxSegments(2).build());
+            return md;
+        }).get();
+
+        resources.reportSegmentLoadAsync(topicName, 0,
+                new SegmentLoadStats(20_000, 0, 0, 0)).get();
+        controller.evaluateAutoScaleForTest().get();
+        assertEquals(activeSegmentCount(), 2,
+                "per-topic maxSegments=2 must cap the split despite the hot 
segment");
+    }
+
+    @Test
+    public void testTopicOverrideSurvivesSplit() throws Exception {
+        // The override must survive a layout mutation (toMetadata must carry 
it over).
+        startController(2);
+        AutoScalePolicyOverride override =
+                AutoScalePolicyOverride.builder().maxSegments(3).build();
+        resources.updateScalableTopicAsync(topicName, md -> {
+            md.setAutoScalePolicy(override);
+            return md;
+        }).get();
+
+        resources.reportSegmentLoadAsync(topicName, 0,
+                new SegmentLoadStats(20_000, 0, 0, 0)).get();
+        controller.evaluateAutoScaleForTest().get();
+        assertEquals(activeSegmentCount(), 3, "first split allowed up to 
maxSegments=3");
+        assertEquals(resources.getScalableTopicMetadataAsync(topicName).get()
+                        .orElseThrow().getAutoScalePolicy(), override,
+                "the per-topic override must survive the split's metadata 
rewrite");
+
+        // At the cap now — further hot reports must not split.
+        resources.reportSegmentLoadAsync(topicName, 1,
+                new SegmentLoadStats(20_000, 0, 0, 0)).get();
+        controller.evaluateAutoScaleForTest().get();
+        assertEquals(activeSegmentCount(), 3, "capped at the per-topic 
maxSegments");
+    }
+
+    @Test
+    public void testInvalidOverrideCombinationFallsBackToDisabled() throws 
Exception {
+        // Namespace and topic overrides that are each valid against the 
broker defaults but
+        // invalid combined: the namespace raises the merge threshold, the 
topic lowers the
+        // matching split threshold below it (hysteresis inversion). The 
controller must not
+        // fail the evaluation chain — it treats auto split/merge as disabled 
until fixed.
+        namespacePolicies = new Policies();
+        namespacePolicies.scalableTopicAutoScalePolicy = 
AutoScalePolicyOverride.builder()
+                .mergeMsgRateInThreshold(5_000.0)
+                .build();
+
+        startController(2);
+        resources.updateScalableTopicAsync(topicName, md -> {
+            md.setAutoScalePolicy(AutoScalePolicyOverride.builder()
+                    .splitMsgRateInThreshold(2_000.0)
+                    .build());
+            return md;
+        }).get();
+
+        resources.reportSegmentLoadAsync(topicName, 0,
+                new SegmentLoadStats(20_000, 0, 0, 0)).get();
+        // Must complete normally (no IllegalArgumentException) and take no 
action.
+        controller.evaluateAutoScaleForTest().get();
+        assertEquals(activeSegmentCount(), 2,
+                "invalid override combination must disable auto split/merge, 
not split");
+    }
+
     @Test
     public void testConsumerBurstConvergesWithoutTicks() throws Exception {
         // A group of consumers joining in quick succession must converge to 
one segment
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
index 364c9299a11..afcab4c8b98 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
@@ -269,14 +269,19 @@ public class SegmentLayoutTest {
     @Test
     public void testToMetadata() {
         ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(2, Map.of("key", "value"));
+        // A layout mutation must round-trip every non-layout field, not just 
properties.
+        
metadata.setAutoScalePolicy(org.apache.pulsar.common.policies.data.AutoScalePolicyOverride
+                .builder().enabled(false).maxSegments(8).build());
         SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
         SegmentLayout afterSplit = layout.splitSegment(0, 0L);
 
-        ScalableTopicMetadata restored = afterSplit.toMetadata(Map.of("key", 
"value"));
+        ScalableTopicMetadata restored = afterSplit.toMetadata(metadata);
         assertEquals(restored.getEpoch(), 1);
         assertEquals(restored.getNextSegmentId(), 4);
         assertEquals(restored.getSegments().size(), 4);
         assertEquals(restored.getProperties().get("key"), "value");
+        assertEquals(restored.getAutoScalePolicy(), 
metadata.getAutoScalePolicy(),
+                "split/merge must not drop the per-topic auto-scale policy");
     }
 
     @Test
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index cc3f5a92964..b1063e8b542 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -29,6 +29,7 @@ import 
org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
 import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -1366,6 +1367,72 @@ public interface Namespaces {
      */
     CompletableFuture<Void> removeAutoTopicCreationAsync(String namespace);
 
+    /**
+     * Sets the scalable-topic auto split/merge policy override for a 
namespace (PIP-483),
+     * overriding the broker's defaults for every scalable topic in the 
namespace that does
+     * not carry its own per-topic override.
+     *
+     * @param namespace
+     *            Namespace name
+     * @param override
+     *            the override; unset fields fall through to the broker 
configuration
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setScalableTopicAutoScalePolicy(String namespace, 
AutoScalePolicyOverride override)
+            throws PulsarAdminException;
+
+    /**
+     * Sets the scalable-topic auto split/merge policy override for a 
namespace asynchronously.
+     *
+     * @param namespace
+     *            Namespace name
+     * @param override
+     *            the override; unset fields fall through to the broker 
configuration
+     */
+    CompletableFuture<Void> setScalableTopicAutoScalePolicyAsync(
+            String namespace, AutoScalePolicyOverride override);
+
+    /**
+     * Get the scalable-topic auto split/merge policy override for a namespace.
+     *
+     * @param namespace
+     *            Namespace name
+     * @return the override, or {@code null} if none is set
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String namespace) 
throws PulsarAdminException;
+
+    /**
+     * Get the scalable-topic auto split/merge policy override for a namespace 
asynchronously.
+     *
+     * @param namespace
+     *            Namespace name
+     * @return the override, or {@code null} if none is set
+     */
+    CompletableFuture<AutoScalePolicyOverride> 
getScalableTopicAutoScalePolicyAsync(String namespace);
+
+    /**
+     * Removes the scalable-topic auto split/merge policy override from a 
namespace, letting
+     * the broker configuration apply.
+     *
+     * @param namespace
+     *            Namespace name
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void removeScalableTopicAutoScalePolicy(String namespace) throws 
PulsarAdminException;
+
+    /**
+     * Removes the scalable-topic auto split/merge policy override from a 
namespace
+     * asynchronously.
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Void> removeScalableTopicAutoScalePolicyAsync(String 
namespace);
+
     /**
      * Sets the autoSubscriptionCreation policy for a given namespace, 
overriding broker settings.
      * <p/>
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
index debf54231b9..b2f7632951e 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.admin;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
 import org.apache.pulsar.common.policies.data.ScalableSubscriptionType;
 import org.apache.pulsar.common.policies.data.ScalableTopicMetadata;
 import org.apache.pulsar.common.policies.data.ScalableTopicStats;
@@ -145,6 +146,54 @@ public interface ScalableTopics {
      */
     CompletableFuture<ScalableTopicMetadata> getMetadataAsync(String topic);
 
+    /**
+     * Set the per-topic auto split/merge policy override (PIP-483). Overrides 
the namespace
+     * policy and the broker defaults for this topic; unset fields fall 
through.
+     *
+     * @param topic    Topic name in the format "tenant/namespace/topic"
+     * @param override the override to apply
+     */
+    void setAutoScalePolicy(String topic, AutoScalePolicyOverride override) 
throws PulsarAdminException;
+
+    /**
+     * Set the per-topic auto split/merge policy override asynchronously.
+     *
+     * @param topic    Topic name in the format "tenant/namespace/topic"
+     * @param override the override to apply
+     */
+    CompletableFuture<Void> setAutoScalePolicyAsync(String topic, 
AutoScalePolicyOverride override);
+
+    /**
+     * Get the per-topic auto split/merge policy override.
+     *
+     * @param topic Topic name in the format "tenant/namespace/topic"
+     * @return the override, or {@code null} if none is set
+     */
+    AutoScalePolicyOverride getAutoScalePolicy(String topic) throws 
PulsarAdminException;
+
+    /**
+     * Get the per-topic auto split/merge policy override asynchronously.
+     *
+     * @param topic Topic name in the format "tenant/namespace/topic"
+     * @return the override, or {@code null} if none is set
+     */
+    CompletableFuture<AutoScalePolicyOverride> getAutoScalePolicyAsync(String 
topic);
+
+    /**
+     * Remove the per-topic auto split/merge policy override, letting the 
namespace policy
+     * and broker defaults apply.
+     *
+     * @param topic Topic name in the format "tenant/namespace/topic"
+     */
+    void removeAutoScalePolicy(String topic) throws PulsarAdminException;
+
+    /**
+     * Remove the per-topic auto split/merge policy override asynchronously.
+     *
+     * @param topic Topic name in the format "tenant/namespace/topic"
+     */
+    CompletableFuture<Void> removeAutoScalePolicyAsync(String topic);
+
     /**
      * Delete a scalable topic and all its underlying segment topics.
      *
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoScalePolicyOverride.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoScalePolicyOverride.java
new file mode 100644
index 00000000000..30ecd78e5a7
--- /dev/null
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoScalePolicyOverride.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Override of the scalable-topic auto split/merge policy (PIP-483), settable 
at the
+ * namespace level (field on {@link Policies}) and at the topic level (field 
on the
+ * scalable-topic metadata).
+ *
+ * <p>Every field is optional: an unset ({@code null}) field falls through to 
the next
+ * resolution layer — topic override → namespace override → broker 
configuration. Setting
+ * {@code enabled = false} opts the namespace or topic out of auto split/merge 
entirely.
+ *
+ * <p>The resolved policy must satisfy the same invariants as the broker 
configuration
+ * (positive split thresholds, split thresholds strictly above merge 
thresholds,
+ * {@code minSegments <= maxSegments}, non-negative cooldowns); an override 
that would
+ * violate them is rejected when it is set.
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public final class AutoScalePolicyOverride {
+
+    /** Master switch; {@code false} opts this namespace/topic out of auto 
split/merge. */
+    private Boolean enabled;
+
+    /** Hard ceiling on active segments; splits stop once reached. */
+    private Integer maxSegments;
+
+    /** Hard floor on active segments; merges stop once reached. */
+    private Integer minSegments;
+
+    /** Max merges in a segment's lineage before merges are disabled for it. */
+    private Integer maxDagDepth;
+
+    /** Minimum seconds between automatic splits (coalesces bursts). */
+    private Long splitCooldownSeconds;
+
+    /** Minimum seconds between automatic merges. */
+    private Long mergeCooldownSeconds;
+
+    /** Seconds a segment pair must stay cold before becoming merge-eligible. 
*/
+    private Long mergeWindowSeconds;
+
+    /** Inbound msg/s above which a segment is split. */
+    private Double splitMsgRateInThreshold;
+
+    /** Inbound bytes/s above which a segment is split. */
+    private Long splitBytesRateInThreshold;
+
+    /** Outbound (dispatched) msg/s above which a segment is split. */
+    private Double splitMsgRateOutThreshold;
+
+    /** Outbound bytes/s above which a segment is split. */
+    private Long splitBytesRateOutThreshold;
+
+    /** Inbound msg/s below which a segment counts as cold for merging. */
+    private Double mergeMsgRateInThreshold;
+
+    /** Inbound bytes/s below which a segment counts as cold for merging. */
+    private Long mergeBytesRateInThreshold;
+
+    /** Outbound msg/s below which a segment counts as cold for merging. */
+    private Double mergeMsgRateOutThreshold;
+
+    /** Outbound bytes/s below which a segment counts as cold for merging. */
+    private Long mergeBytesRateOutThreshold;
+}
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index db0aa0620d5..df3cf53a3ca 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -61,6 +61,8 @@ public class Policies {
     public AutoTopicCreationOverride autoTopicCreationOverride = null;
     // If set, it will override the broker settings for allowing auto 
subscription creation
     public AutoSubscriptionCreationOverride autoSubscriptionCreationOverride = 
null;
+    // If set, it will override the broker's scalable-topic auto split/merge 
settings (PIP-483)
+    public AutoScalePolicyOverride scalableTopicAutoScalePolicy = null;
     public Map<String, PublishRate> publishMaxMessageRate = new HashMap<>();
 
     @SuppressWarnings("checkstyle:MemberName")
@@ -158,7 +160,7 @@ public class Policies {
                 backlog_quota_map, publishMaxMessageRate, clusterDispatchRate,
                 topicDispatchRate, subscriptionDispatchRate, 
replicatorDispatchRate,
                 clusterSubscribeRate, deduplicationEnabled, 
autoTopicCreationOverride,
-                autoSubscriptionCreationOverride, persistence,
+                autoSubscriptionCreationOverride, 
scalableTopicAutoScalePolicy, persistence,
                 bundles, latency_stats_sample_rate,
                 message_ttl_in_seconds, subscription_expiration_time_minutes, 
retention_policies,
                 encryption_required, delayed_delivery_policies, 
inactive_topic_policies,
@@ -198,6 +200,7 @@ public class Policies {
                     && Objects.equals(deduplicationEnabled, 
other.deduplicationEnabled)
                     && Objects.equals(autoTopicCreationOverride, 
other.autoTopicCreationOverride)
                     && Objects.equals(autoSubscriptionCreationOverride, 
other.autoSubscriptionCreationOverride)
+                    && Objects.equals(scalableTopicAutoScalePolicy, 
other.scalableTopicAutoScalePolicy)
                     && Objects.equals(persistence, other.persistence) && 
Objects.equals(bundles, other.bundles)
                     && Objects.equals(latency_stats_sample_rate, 
other.latency_stats_sample_rate)
                     && Objects.equals(message_ttl_in_seconds,
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java
index 6f3123228f1..ce246a1340d 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java
@@ -54,6 +54,12 @@ public class ScalableTopicMetadata {
     @Builder.Default
     private Map<String, String> properties = Map.of();
 
+    /**
+     * Per-topic auto split/merge policy override (PIP-483). {@code null} 
means no override:
+     * the namespace policy and then the broker configuration apply.
+     */
+    private AutoScalePolicyOverride autoScalePolicy;
+
     /**
      * Describes a single segment in a scalable topic's DAG.
      */
diff --git 
a/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java
 
b/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java
index 4784ea9b1b9..6259d9799a4 100644
--- 
a/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java
+++ 
b/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java
@@ -79,13 +79,16 @@ public class ScalableTopicMetadataTest {
         segments.put(0L, activeSegment(0L, 0, 0xFFFF, 0L));
 
         Map<String, String> props = Map.of("retention", "7d");
+        AutoScalePolicyOverride autoScalePolicy =
+                AutoScalePolicyOverride.builder().enabled(false).build();
 
-        ScalableTopicMetadata md = new ScalableTopicMetadata(3L, 1L, segments, 
props);
+        ScalableTopicMetadata md = new ScalableTopicMetadata(3L, 1L, segments, 
props, autoScalePolicy);
 
         assertEquals(md.getEpoch(), 3L);
         assertEquals(md.getNextSegmentId(), 1L);
         assertEquals(md.getSegments(), segments);
         assertEquals(md.getProperties(), props);
+        assertEquals(md.getAutoScalePolicy(), autoScalePolicy);
     }
 
     @Test
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 75a331a45db..fd0e5c5bec0 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
 import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -566,6 +567,45 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
         return asyncDeleteRequest(path);
     }
 
+    @Override
+    public void setScalableTopicAutoScalePolicy(String namespace,
+            AutoScalePolicyOverride override) throws PulsarAdminException {
+        sync(() -> setScalableTopicAutoScalePolicyAsync(namespace, override));
+    }
+
+    @Override
+    public CompletableFuture<Void> setScalableTopicAutoScalePolicyAsync(
+            String namespace, AutoScalePolicyOverride override) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "scalableTopicAutoScalePolicy");
+        return asyncPostRequest(path, Entity.entity(override, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String 
namespace)
+            throws PulsarAdminException {
+        return sync(() -> getScalableTopicAutoScalePolicyAsync(namespace));
+    }
+
+    @Override
+    public CompletableFuture<AutoScalePolicyOverride> 
getScalableTopicAutoScalePolicyAsync(String namespace) {
+        return asyncGetNamespaceParts(new 
FutureCallback<AutoScalePolicyOverride>() {
+        }, namespace,
+                "scalableTopicAutoScalePolicy");
+    }
+
+    @Override
+    public void removeScalableTopicAutoScalePolicy(String namespace) throws 
PulsarAdminException {
+        sync(() -> removeScalableTopicAutoScalePolicyAsync(namespace));
+    }
+
+    @Override
+    public CompletableFuture<Void> 
removeScalableTopicAutoScalePolicyAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "scalableTopicAutoScalePolicy");
+        return asyncDeleteRequest(path);
+    }
+
     @Override
     public void setAutoSubscriptionCreation(String namespace,
             AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) 
throws PulsarAdminException {
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
index e11c6cfe747..e4905aa63c8 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.client.admin.ScalableTopics;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
 import org.apache.pulsar.common.policies.data.ScalableTopicMetadata;
 import org.apache.pulsar.common.policies.data.ScalableTopicStats;
 
@@ -141,6 +142,43 @@ public class ScalableTopicsImpl extends BaseResource 
implements ScalableTopics {
         return asyncGetRequest(topicPath(tn), ScalableTopicMetadata.class);
     }
 
+    // --- Auto split/merge policy override (PIP-483) ---
+
+    @Override
+    public void setAutoScalePolicy(String topic, AutoScalePolicyOverride 
override)
+            throws PulsarAdminException {
+        sync(() -> setAutoScalePolicyAsync(topic, override));
+    }
+
+    @Override
+    public CompletableFuture<Void> setAutoScalePolicyAsync(String topic, 
AutoScalePolicyOverride override) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn).path("autoScalePolicy");
+        return asyncPostRequest(path, Entity.entity(override, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public AutoScalePolicyOverride getAutoScalePolicy(String topic) throws 
PulsarAdminException {
+        return sync(() -> getAutoScalePolicyAsync(topic));
+    }
+
+    @Override
+    public CompletableFuture<AutoScalePolicyOverride> 
getAutoScalePolicyAsync(String topic) {
+        TopicName tn = validateTopic(topic);
+        return asyncGetRequest(topicPath(tn).path("autoScalePolicy"), 
AutoScalePolicyOverride.class);
+    }
+
+    @Override
+    public void removeAutoScalePolicy(String topic) throws 
PulsarAdminException {
+        sync(() -> removeAutoScalePolicyAsync(topic));
+    }
+
+    @Override
+    public CompletableFuture<Void> removeAutoScalePolicyAsync(String topic) {
+        TopicName tn = validateTopic(topic);
+        return asyncDeleteRequest(topicPath(tn).path("autoScalePolicy"));
+    }
+
     // --- Delete ---
 
     @Override
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index fd05ee40e76..2a2b56c599b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -55,6 +55,7 @@ public enum PolicyName {
     DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT,
     ALLOW_CLUSTERS,
     ALLOW_CUSTOM_METRIC_LABELS,
+    SCALABLE_TOPIC_AUTO_SCALE,
 
     // cluster policies
     CLUSTER_MIGRATION,

Reply via email to