This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cb018d904ed [fix][admin] Backlog quota's policy is null which causes a 
NPE (#24192)
cb018d904ed is described below

commit cb018d904ed877a66a26e189201a839d65ca3142
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Fri Apr 25 16:15:06 2025 +0800

    [fix][admin] Backlog quota's policy is null which causes a NPE (#24192)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 11 +++++++
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 13 +++++++--
 .../broker/admin/impl/PersistentTopicsBase.java    | 12 +++++++-
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  2 ++
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 26 +++++++++++------
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 34 ++++++++++++++++++++++
 .../service/ReplicatorTopicPoliciesTest.java       |  2 ++
 .../pulsar/common/policies/data/BacklogQuota.java  |  6 ++++
 8 files changed, 94 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 5f2f031a2d4..fb7679ff269 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -923,10 +923,21 @@ public abstract class AdminResource extends 
PulsarWebResource {
                 == Status.CONFLICT.getStatusCode();
     }
 
+    protected static boolean isBadRequest(Throwable ex) {
+        Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+        return realCause instanceof WebApplicationException
+                && ((WebApplicationException) 
realCause).getResponse().getStatus()
+                == Status.BAD_REQUEST.getStatusCode();
+    }
+
     protected static boolean isNot307And404Exception(Throwable ex) {
         return !isRedirectException(ex) && !isNotFoundException(ex);
     }
 
+    protected static boolean isNot307And404And400Exception(Throwable ex) {
+        return !isRedirectException(ex) && !isNotFoundException(ex) && 
!isBadRequest(ex);
+    }
+
     protected static String getTopicNotFoundErrorMessage(String topic) {
         return String.format("Topic %s not found", topic);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index c866d2d6f8a..61db138e60c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1416,6 +1416,13 @@ public abstract class NamespacesBase extends 
AdminResource {
     }
     protected CompletableFuture<Void> setBacklogQuotaAsync(BacklogQuotaType 
backlogQuotaType,
                                                            BacklogQuota quota) 
{
+        try {
+            quota.validate();
+        } catch (IllegalArgumentException e) {
+            RestException restException = new 
RestException(Status.BAD_REQUEST, String.format("Set namespace[%s]"
+                + " backlog quota failed because the data validation failed. 
%s", namespaceName, e.getMessage()));
+            return CompletableFuture.failedFuture(restException);
+        }
         return namespaceResources().setPoliciesAsync(namespaceName, policies 
-> {
             RetentionPolicies retentionPolicies = policies.retention_policies;
             final BacklogQuotaType quotaType = backlogQuotaType != null ? 
backlogQuotaType
@@ -2720,8 +2727,10 @@ public abstract class NamespacesBase extends 
AdminResource {
                             namespaceName, backlogQuota);
                 }).exceptionally(ex -> {
                     resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    log.error("[{}] Failed to update backlog quota map for 
namespace {}",
-                            clientAppId(), namespaceName, ex);
+                    if (isNot307And404And400Exception(ex)) {
+                        log.error("[{}] Failed to update backlog quota map for 
namespace {}",
+                                clientAppId(), namespaceName, ex);
+                    }
                     return null;
                 });
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 7e0837ebc6e..de99dd44626 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3304,6 +3304,16 @@ public class PersistentTopicsBase extends AdminResource {
                                                               BacklogQuotaImpl 
backlogQuota, boolean isGlobal) {
         BacklogQuota.BacklogQuotaType finalBacklogQuotaType = backlogQuotaType 
== null
                 ? BacklogQuota.BacklogQuotaType.destination_storage : 
backlogQuotaType;
+        try {
+            // Null value means delete backlog quota.
+            if (backlogQuota != null) {
+                backlogQuota.validate();
+            }
+        } catch (IllegalArgumentException e) {
+            RestException restException = new 
RestException(Status.BAD_REQUEST, String.format("Set namespace[%s]"
+                + " backlog quota failed because the data validation failed. 
%s", namespaceName, e.getMessage()));
+            return CompletableFuture.failedFuture(restException);
+        }
 
         return validatePoliciesReadOnlyAccessAsync()
                 .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, 
isGlobal))
@@ -4976,7 +4986,7 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void handleTopicPolicyException(String methodName, Throwable 
thr, AsyncResponse asyncResponse) {
         Throwable cause = thr.getCause();
-        if (isNot307And404Exception(cause)) {
+        if (isNot307And404And400Exception(cause)) {
             log.error("[{}] Failed to perform {} on topic {}",
                     clientAppId(), methodName, topicName, cause);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 87c48e961db..38fcef02057 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3409,6 +3409,7 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         admin.namespaces().setRetention(ns, new RetentionPolicies(1800, 
10000));
         // set backlog quota.
         admin.namespaces().setBacklogQuota(ns, BacklogQuota.builder()
+                
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold)
                 
.limitSize(backlogQuotaLimitSize).limitTime(backlogQuotaLimitTime).build());
         // Verify result.
         Map<BacklogQuota.BacklogQuotaType, BacklogQuota> map = 
admin.namespaces().getBacklogQuotaMap(ns);
@@ -3417,6 +3418,7 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         BacklogQuota backlogQuota = 
map.get(BacklogQuota.BacklogQuotaType.destination_storage);
         assertEquals(backlogQuota.getLimitSize(), backlogQuotaLimitSize);
         assertEquals(backlogQuota.getLimitTime(), backlogQuotaLimitTime);
+        assertEquals(backlogQuota.getPolicy(), 
BacklogQuota.RetentionPolicy.producer_request_hold);
         // cleanup.
         admin.namespaces().deleteNamespace(ns);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index e9ca122bba1..f6da368ca65 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -3629,29 +3629,37 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         String namespace = "prop-xyz/ns1";
         //test size check.
         admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, 
10));
-        admin.namespaces().setBacklogQuota(namespace, 
BacklogQuota.builder().limitSize(9 * 1024 * 1024).build());
+        admin.namespaces().setBacklogQuota(namespace, 
BacklogQuota.builder().limitSize(9 * 1024 * 1024)
+                
.retentionPolicy(RetentionPolicy.producer_request_hold).build());
         
Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () 
-> {
-            admin.namespaces().setBacklogQuota(namespace, 
BacklogQuota.builder().limitSize(100 * 1024 * 1024).build());
+            admin.namespaces().setBacklogQuota(namespace, 
BacklogQuota.builder().limitSize(100 * 1024 * 1024)
+                    
.retentionPolicy(RetentionPolicy.producer_request_hold).build());
         });
 
         //test time check
         admin.namespaces().setRetention(namespace, new RetentionPolicies(10, 
-1));
-        admin.namespaces().setBacklogQuota(namespace, 
BacklogQuota.builder().limitTime(9 * 60).build());
+        admin.namespaces().setBacklogQuota(namespace, 
BacklogQuota.builder().limitTime(9 * 60)
+                
.retentionPolicy(RetentionPolicy.producer_request_hold).build());
         
Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () 
-> {
-            admin.namespaces().setBacklogQuota(namespace, 
BacklogQuota.builder().limitTime(11 * 60).build());
+            admin.namespaces().setBacklogQuota(namespace, 
BacklogQuota.builder().limitTime(11 * 60)
+                    
.retentionPolicy(RetentionPolicy.producer_request_hold).build());
         });
 
         // test both size and time.
         admin.namespaces().setRetention(namespace, new RetentionPolicies(10, 
10));
-        admin.namespaces().setBacklogQuota(namespace, 
BacklogQuota.builder().limitSize(9 * 1024 * 1024).build());
-        admin.namespaces().setBacklogQuota(namespace, 
BacklogQuota.builder().limitTime(9 * 60).build());
+        admin.namespaces().setBacklogQuota(namespace, 
BacklogQuota.builder().limitSize(9 * 1024 * 1024)
+                
.retentionPolicy(RetentionPolicy.producer_request_hold).build());
+        admin.namespaces().setBacklogQuota(namespace, 
BacklogQuota.builder().limitTime(9 * 60)
+                
.retentionPolicy(RetentionPolicy.producer_request_hold).build());
         admin.namespaces().setBacklogQuota(namespace, 
BacklogQuota.builder().limitSize(9 * 1024 * 1024).
-                limitTime(9 * 60).build());
+                limitTime(9 * 
60).retentionPolicy(RetentionPolicy.producer_request_hold).build());
         
Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () 
-> {
-            admin.namespaces().setBacklogQuota(namespace, 
BacklogQuota.builder().limitSize(100 * 1024 * 1024).build());
+            admin.namespaces().setBacklogQuota(namespace, 
BacklogQuota.builder().limitSize(100 * 1024 * 1024)
+                    
.retentionPolicy(RetentionPolicy.producer_request_hold).build());
         });
         
Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () 
-> {
-            admin.namespaces().setBacklogQuota(namespace, 
BacklogQuota.builder().limitTime(100 * 60).build());
+            admin.namespaces().setBacklogQuota(namespace, 
BacklogQuota.builder().limitTime(100 * 60)
+                    
.retentionPolicy(RetentionPolicy.producer_request_hold).build());
         });
 
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 3419a4b161e..4dbb57b44ae 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -40,6 +40,8 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import javax.ws.rs.BadRequestException;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.Response;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -93,6 +95,8 @@ import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.assertj.core.api.Assertions;
 import org.awaitility.Awaitility;
+import org.glassfish.jersey.client.JerseyClient;
+import org.glassfish.jersey.client.JerseyClientBuilder;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -3286,6 +3290,36 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 .build());
     }
 
+    /**
+     * Verify: {@link BacklogQuota#getPolicy()} can not be null.
+     */
+    @Test
+    public void testSetNonBacklogQuotType() throws Exception {
+        final NamespaceName ns = NamespaceName.get(myNamespace);
+        final String hostAndPort = pulsar.getWebServiceAddress();
+        final String nsPath = "/admin/v2/namespaces/" + ns + "/backlogQuota";
+        final String topicPath = "/admin/v2/persistent/" + ns + 
"/test-set-backlog-quota/backlogQuota";
+        admin.namespaces().setBacklogQuota(ns.toString(), 
BacklogQuota.builder().limitTime(1).limitSize(1).retentionPolicy(
+                
BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build());
+        BacklogQuota backlogQuotaWithNonPolicy = 
BacklogQuota.builder().limitTime(1).limitSize(1).build();
+        JerseyClient httpClient = JerseyClientBuilder.createClient();
+        // Namespace level.
+        Response response1 = 
httpClient.target(hostAndPort).path(nsPath).request()
+                .header("Content-Type", "application/json")
+                .post(Entity.json(backlogQuotaWithNonPolicy));
+        assertEquals(response1.getStatus(), 400);
+        
assertTrue(response1.getStatusInfo().getReasonPhrase().contains("policy cannot 
be null"));
+        // Topic level.
+        Response response2 = 
httpClient.target(hostAndPort).path(topicPath).request()
+                .header("Content-Type", "application/json")
+                .post(Entity.json(backlogQuotaWithNonPolicy));
+        assertEquals(response2.getStatus(), 400);
+        
assertTrue(response2.getStatusInfo().getReasonPhrase().contains("policy cannot 
be null"));
+        // cleanup.
+        httpClient.close();
+
+    }
+
     @Test
     public void testSetSubRateWithSub() throws Exception {
         String topic = "persistent://" + myNamespace + 
"/testSetSubRateWithSub";
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index ab1f0c0ece2..aeec8fdb4de 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
@@ -81,6 +82,7 @@ public class ReplicatorTopicPoliciesTest extends 
ReplicatorTestBase {
         BacklogQuotaImpl backlogQuota = new BacklogQuotaImpl();
         backlogQuota.setLimitSize(1);
         backlogQuota.setLimitTime(2);
+        
backlogQuota.setPolicy(BacklogQuota.RetentionPolicy.producer_exception);
         // local
         admin1.topicPolicies().setBacklogQuota(topic, backlogQuota);
         Awaitility.await().untilAsserted(() ->
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java
index 4045eade3db..be125f0cf24 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java
@@ -88,4 +88,10 @@ public interface BacklogQuota {
         /** Policy which evicts the oldest message from the slowest consumer's 
backlog. */
         consumer_backlog_eviction,
     }
+
+    default void validate() {
+        if (getPolicy() == null) {
+            throw new IllegalArgumentException("the attribute policy cannot be 
null");
+        }
+    }
 }

Reply via email to