This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new cb018d904ed [fix][admin] Backlog quota's policy is null which causes a NPE (#24192) cb018d904ed is described below commit cb018d904ed877a66a26e189201a839d65ca3142 Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Fri Apr 25 16:15:06 2025 +0800 [fix][admin] Backlog quota's policy is null which causes a NPE (#24192) --- .../apache/pulsar/broker/admin/AdminResource.java | 11 +++++++ .../pulsar/broker/admin/impl/NamespacesBase.java | 13 +++++++-- .../broker/admin/impl/PersistentTopicsBase.java | 12 +++++++- .../apache/pulsar/broker/admin/AdminApi2Test.java | 2 ++ .../apache/pulsar/broker/admin/AdminApiTest.java | 26 +++++++++++------ .../pulsar/broker/admin/TopicPoliciesTest.java | 34 ++++++++++++++++++++++ .../service/ReplicatorTopicPoliciesTest.java | 2 ++ .../pulsar/common/policies/data/BacklogQuota.java | 6 ++++ 8 files changed, 94 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 5f2f031a2d4..fb7679ff269 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -923,10 +923,21 @@ public abstract class AdminResource extends PulsarWebResource { == Status.CONFLICT.getStatusCode(); } + protected static boolean isBadRequest(Throwable ex) { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + return realCause instanceof WebApplicationException + && ((WebApplicationException) realCause).getResponse().getStatus() + == Status.BAD_REQUEST.getStatusCode(); + } + protected static boolean isNot307And404Exception(Throwable ex) { return !isRedirectException(ex) && !isNotFoundException(ex); } + protected static boolean isNot307And404And400Exception(Throwable ex) { + return !isRedirectException(ex) && !isNotFoundException(ex) && !isBadRequest(ex); + } + protected static String getTopicNotFoundErrorMessage(String topic) { return String.format("Topic %s not found", topic); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index c866d2d6f8a..61db138e60c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1416,6 +1416,13 @@ public abstract class NamespacesBase extends AdminResource { } protected CompletableFuture<Void> setBacklogQuotaAsync(BacklogQuotaType backlogQuotaType, BacklogQuota quota) { + try { + quota.validate(); + } catch (IllegalArgumentException e) { + RestException restException = new RestException(Status.BAD_REQUEST, String.format("Set namespace[%s]" + + " backlog quota failed because the data validation failed. %s", namespaceName, e.getMessage())); + return CompletableFuture.failedFuture(restException); + } return namespaceResources().setPoliciesAsync(namespaceName, policies -> { RetentionPolicies retentionPolicies = policies.retention_policies; final BacklogQuotaType quotaType = backlogQuotaType != null ? backlogQuotaType @@ -2720,8 +2727,10 @@ public abstract class NamespacesBase extends AdminResource { namespaceName, backlogQuota); }).exceptionally(ex -> { resumeAsyncResponseExceptionally(asyncResponse, ex); - log.error("[{}] Failed to update backlog quota map for namespace {}", - clientAppId(), namespaceName, ex); + if (isNot307And404And400Exception(ex)) { + log.error("[{}] Failed to update backlog quota map for namespace {}", + clientAppId(), namespaceName, ex); + } return null; }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 7e0837ebc6e..de99dd44626 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -3304,6 +3304,16 @@ public class PersistentTopicsBase extends AdminResource { BacklogQuotaImpl backlogQuota, boolean isGlobal) { BacklogQuota.BacklogQuotaType finalBacklogQuotaType = backlogQuotaType == null ? BacklogQuota.BacklogQuotaType.destination_storage : backlogQuotaType; + try { + // Null value means delete backlog quota. + if (backlogQuota != null) { + backlogQuota.validate(); + } + } catch (IllegalArgumentException e) { + RestException restException = new RestException(Status.BAD_REQUEST, String.format("Set namespace[%s]" + + " backlog quota failed because the data validation failed. %s", namespaceName, e.getMessage())); + return CompletableFuture.failedFuture(restException); + } return validatePoliciesReadOnlyAccessAsync() .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal)) @@ -4976,7 +4986,7 @@ public class PersistentTopicsBase extends AdminResource { protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) { Throwable cause = thr.getCause(); - if (isNot307And404Exception(cause)) { + if (isNot307And404And400Exception(cause)) { log.error("[{}] Failed to perform {} on topic {}", clientAppId(), methodName, topicName, cause); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 87c48e961db..38fcef02057 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -3409,6 +3409,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { admin.namespaces().setRetention(ns, new RetentionPolicies(1800, 10000)); // set backlog quota. admin.namespaces().setBacklogQuota(ns, BacklogQuota.builder() + .retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold) .limitSize(backlogQuotaLimitSize).limitTime(backlogQuotaLimitTime).build()); // Verify result. Map<BacklogQuota.BacklogQuotaType, BacklogQuota> map = admin.namespaces().getBacklogQuotaMap(ns); @@ -3417,6 +3418,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { BacklogQuota backlogQuota = map.get(BacklogQuota.BacklogQuotaType.destination_storage); assertEquals(backlogQuota.getLimitSize(), backlogQuotaLimitSize); assertEquals(backlogQuota.getLimitTime(), backlogQuotaLimitTime); + assertEquals(backlogQuota.getPolicy(), BacklogQuota.RetentionPolicy.producer_request_hold); // cleanup. admin.namespaces().deleteNamespace(ns); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index e9ca122bba1..f6da368ca65 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -3629,29 +3629,37 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { String namespace = "prop-xyz/ns1"; //test size check. admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, 10)); - admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(9 * 1024 * 1024).build()); + admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(9 * 1024 * 1024) + .retentionPolicy(RetentionPolicy.producer_request_hold).build()); Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> { - admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(100 * 1024 * 1024).build()); + admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(100 * 1024 * 1024) + .retentionPolicy(RetentionPolicy.producer_request_hold).build()); }); //test time check admin.namespaces().setRetention(namespace, new RetentionPolicies(10, -1)); - admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(9 * 60).build()); + admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(9 * 60) + .retentionPolicy(RetentionPolicy.producer_request_hold).build()); Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> { - admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(11 * 60).build()); + admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(11 * 60) + .retentionPolicy(RetentionPolicy.producer_request_hold).build()); }); // test both size and time. admin.namespaces().setRetention(namespace, new RetentionPolicies(10, 10)); - admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(9 * 1024 * 1024).build()); - admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(9 * 60).build()); + admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(9 * 1024 * 1024) + .retentionPolicy(RetentionPolicy.producer_request_hold).build()); + admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(9 * 60) + .retentionPolicy(RetentionPolicy.producer_request_hold).build()); admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(9 * 1024 * 1024). - limitTime(9 * 60).build()); + limitTime(9 * 60).retentionPolicy(RetentionPolicy.producer_request_hold).build()); Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> { - admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(100 * 1024 * 1024).build()); + admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(100 * 1024 * 1024) + .retentionPolicy(RetentionPolicy.producer_request_hold).build()); }); Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> { - admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(100 * 60).build()); + admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(100 * 60) + .retentionPolicy(RetentionPolicy.producer_request_hold).build()); }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 3419a4b161e..4dbb57b44ae 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -40,6 +40,8 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import javax.ws.rs.BadRequestException; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.Response; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -93,6 +95,8 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; +import org.glassfish.jersey.client.JerseyClient; +import org.glassfish.jersey.client.JerseyClientBuilder; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -3286,6 +3290,36 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .build()); } + /** + * Verify: {@link BacklogQuota#getPolicy()} can not be null. + */ + @Test + public void testSetNonBacklogQuotType() throws Exception { + final NamespaceName ns = NamespaceName.get(myNamespace); + final String hostAndPort = pulsar.getWebServiceAddress(); + final String nsPath = "/admin/v2/namespaces/" + ns + "/backlogQuota"; + final String topicPath = "/admin/v2/persistent/" + ns + "/test-set-backlog-quota/backlogQuota"; + admin.namespaces().setBacklogQuota(ns.toString(), BacklogQuota.builder().limitTime(1).limitSize(1).retentionPolicy( + BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build()); + BacklogQuota backlogQuotaWithNonPolicy = BacklogQuota.builder().limitTime(1).limitSize(1).build(); + JerseyClient httpClient = JerseyClientBuilder.createClient(); + // Namespace level. + Response response1 = httpClient.target(hostAndPort).path(nsPath).request() + .header("Content-Type", "application/json") + .post(Entity.json(backlogQuotaWithNonPolicy)); + assertEquals(response1.getStatus(), 400); + assertTrue(response1.getStatusInfo().getReasonPhrase().contains("policy cannot be null")); + // Topic level. + Response response2 = httpClient.target(hostAndPort).path(topicPath).request() + .header("Content-Type", "application/json") + .post(Entity.json(backlogQuotaWithNonPolicy)); + assertEquals(response2.getStatus(), 400); + assertTrue(response2.getStatusInfo().getReasonPhrase().contains("policy cannot be null")); + // cleanup. + httpClient.close(); + + } + @Test public void testSetSubRateWithSub() throws Exception { String topic = "persistent://" + myNamespace + "/testSetSubRateWithSub"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java index ab1f0c0ece2..aeec8fdb4de 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java @@ -35,6 +35,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; +import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; @@ -81,6 +82,7 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { BacklogQuotaImpl backlogQuota = new BacklogQuotaImpl(); backlogQuota.setLimitSize(1); backlogQuota.setLimitTime(2); + backlogQuota.setPolicy(BacklogQuota.RetentionPolicy.producer_exception); // local admin1.topicPolicies().setBacklogQuota(topic, backlogQuota); Awaitility.await().untilAsserted(() -> diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java index 4045eade3db..be125f0cf24 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java @@ -88,4 +88,10 @@ public interface BacklogQuota { /** Policy which evicts the oldest message from the slowest consumer's backlog. */ consumer_backlog_eviction, } + + default void validate() { + if (getPolicy() == null) { + throw new IllegalArgumentException("the attribute policy cannot be null"); + } + } }