This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new c0e5ce5ce07 [fix][broker]Global topic policies do not affect after unloading topic and persistence global topic policies never affect (#24279) c0e5ce5ce07 is described below commit c0e5ce5ce07186aa8804840c4c3d48368bbdc792 Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Fri May 16 13:35:30 2025 +0800 [fix][broker]Global topic policies do not affect after unloading topic and persistence global topic policies never affect (#24279) (cherry picked from commit 46c2b74f4c9727d6399e9a1b48b031696d8c7cfd) --- .../pulsar/broker/service/BrokerService.java | 89 ++++++++++------- .../SystemTopicBasedTopicPoliciesService.java | 14 +++ .../broker/service/persistent/PersistentTopic.java | 12 ++- .../pulsar/broker/admin/TopicPoliciesTest.java | 108 +++++++++++++++++++++ .../pulsar/broker/service/BrokerServiceTest.java | 2 +- 5 files changed, 184 insertions(+), 41 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 4ccb111aa00..b48159d7ab5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -94,6 +94,7 @@ import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -1101,7 +1102,10 @@ public class BrokerService implements Closeable { if (!exists && !createIfMissing) { return CompletableFuture.completedFuture(Optional.empty()); } - return getTopicPoliciesBypassSystemTopic(topicName).exceptionally(ex -> { + // The topic level policies are not needed now, but the meaning of calling + // "getTopicPoliciesBypassSystemTopic" will wait for system topic policies initialization. + return getTopicPoliciesBypassSystemTopic(topicName, false) + .exceptionally(ex -> { final Throwable rc = FutureUtil.unwrapCompletionException(ex); final String errorInfo = String.format("Topic creation encountered an exception by initialize" + " topic policies service. topic_name=%s error_message=%s", topicName, @@ -1109,7 +1113,6 @@ public class BrokerService implements Closeable { log.error(errorInfo, rc); throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); }).thenCompose(optionalTopicPolicies -> { - final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null); if (topicName.isPartitioned()) { final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName()); return fetchPartitionedTopicMetadataAsync(topicNameEntity) @@ -1120,7 +1123,7 @@ public class BrokerService implements Closeable { || topicName.getPartitionIndex() < metadata.partitions) { return topics.computeIfAbsent(topicName.toString(), (tpName) -> loadOrCreatePersistentTopic(tpName, - createIfMissing, properties, topicPolicies)); + createIfMissing, properties)); } else { final String errorMsg = String.format("Illegal topic partition name %s with max allowed " @@ -1132,7 +1135,7 @@ public class BrokerService implements Closeable { }); } else { return topics.computeIfAbsent(topicName.toString(), (tpName) -> - loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies)); + loadOrCreatePersistentTopic(tpName, createIfMissing, properties)); } }); }); @@ -1203,13 +1206,14 @@ public class BrokerService implements Closeable { } } - private CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesBypassSystemTopic(@Nonnull TopicName topicName) { + private CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesBypassSystemTopic(@Nonnull TopicName topicName, + boolean isGlobal) { Objects.requireNonNull(topicName); final ServiceConfiguration serviceConfiguration = pulsar.getConfiguration(); if (serviceConfiguration.isSystemTopicEnabled() && serviceConfiguration.isTopicLevelPoliciesEnabled() && !NamespaceService.isSystemServiceNamespace(topicName.getNamespace()) && !SystemTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) { - return pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName); + return pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName, isGlobal); } else { return CompletableFuture.completedFuture(Optional.empty()); } @@ -1620,7 +1624,7 @@ public class BrokerService implements Closeable { * @throws RuntimeException */ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final String topic, - boolean createIfMissing, Map<String, String> properties, @Nullable TopicPolicies topicPolicies) { + boolean createIfMissing, Map<String, String> properties) { final CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.createFutureWithTimeout( Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); @@ -1636,7 +1640,7 @@ public class BrokerService implements Closeable { if (topicLoadSemaphore.tryAcquire()) { checkOwnershipAndCreatePersistentTopic(topic, createIfMissing, topicFuture, - properties, topicPolicies); + properties); topicFuture.handle((persistentTopic, ex) -> { // release permit and process pending topic topicLoadSemaphore.release(); @@ -1645,7 +1649,7 @@ public class BrokerService implements Closeable { }); } else { pendingTopicLoadingQueue.add(new TopicLoadingContext(topic, - topicFuture, properties, topicPolicies)); + topicFuture, properties)); if (log.isDebugEnabled()) { log.debug("topic-loading for {} added into pending queue", topic); } @@ -1687,7 +1691,7 @@ public class BrokerService implements Closeable { private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean createIfMissing, CompletableFuture<Optional<Topic>> topicFuture, - Map<String, String> properties, @Nullable TopicPolicies topicPolicies) { + Map<String, String> properties) { TopicName topicName = TopicName.get(topic); pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName) .thenAccept(isActive -> { @@ -1701,8 +1705,8 @@ public class BrokerService implements Closeable { } propertiesFuture.thenAccept(finalProperties -> //TODO add topicName in properties? - createPersistentTopic(topic, createIfMissing, topicFuture, - finalProperties, topicPolicies) + createPersistentTopic0(topic, createIfMissing, topicFuture, + finalProperties) ).exceptionally(throwable -> { log.warn("[{}] Read topic property failed", topic, throwable); pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); @@ -1723,17 +1727,10 @@ public class BrokerService implements Closeable { }); } - @VisibleForTesting public void createPersistentTopic0(final String topic, boolean createIfMissing, CompletableFuture<Optional<Topic>> topicFuture, Map<String, String> properties) { - createPersistentTopic(topic, createIfMissing, topicFuture, properties, null); - } - - private void createPersistentTopic(final String topic, boolean createIfMissing, - CompletableFuture<Optional<Topic>> topicFuture, - Map<String, String> properties, @Nullable TopicPolicies topicPolicies) { TopicName topicName = TopicName.get(topic); final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); @@ -1750,7 +1747,7 @@ public class BrokerService implements Closeable { : CompletableFuture.completedFuture(null); maxTopicsCheck.thenCompose(__ -> - getManagedLedgerConfig(topicName, topicPolicies)).thenAccept(managedLedgerConfig -> { + getManagedLedgerConfig(topicName)).thenAccept(managedLedgerConfig -> { if (isBrokerEntryMetadataEnabled() || isBrokerPayloadProcessorEnabled()) { // init managedLedger interceptor Set<BrokerEntryMetadataInterceptor> interceptors = new HashSet<>(); @@ -1884,31 +1881,45 @@ public class BrokerService implements Closeable { }); } - public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull TopicName topicName) { - final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture = - getTopicPoliciesBypassSystemTopic(topicName); - return topicPoliciesFuture.thenCompose(optionalTopicPolicies -> - getManagedLedgerConfig(topicName, optionalTopicPolicies.orElse(null))); - } - - private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull TopicName topicName, - @Nullable TopicPolicies topicPolicies) { + public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName topicName) { requireNonNull(topicName); NamespaceName namespace = topicName.getNamespaceObject(); ServiceConfiguration serviceConfig = pulsar.getConfiguration(); NamespaceResources nsr = pulsar.getPulsarResources().getNamespaceResources(); LocalPoliciesResources lpr = pulsar.getPulsarResources().getLocalPolicies(); + final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture = + getTopicPoliciesBypassSystemTopic(topicName, false); + final CompletableFuture<Optional<TopicPolicies>> globalTopicPoliciesFuture = + getTopicPoliciesBypassSystemTopic(topicName, true); final CompletableFuture<Optional<Policies>> nsPolicies = nsr.getPoliciesAsync(namespace); final CompletableFuture<Optional<LocalPolicies>> lcPolicies = lpr.getLocalPoliciesAsync(namespace); - return nsPolicies.thenCombine(lcPolicies, (policies, localPolicies) -> { + return topicPoliciesFuture.thenCombine(globalTopicPoliciesFuture, (topicP, globalTopicP) -> { + return new ImmutablePair<>(topicP, globalTopicP); + }).thenCombine(nsPolicies, (topicPoliciesPair, np) -> { + return new ImmutablePair<>(topicPoliciesPair, np); + }).thenCombine(lcPolicies, (combined, localPolicies) -> { + Optional<TopicPolicies> topicP = combined.getLeft().getLeft(); + Optional<TopicPolicies> globalTopicP = combined.getLeft().getRight(); + Optional<Policies> policies = combined.getRight(); + PersistencePolicies persistencePolicies = null; RetentionPolicies retentionPolicies = null; OffloadPoliciesImpl topicLevelOffloadPolicies = null; - if (topicPolicies != null) { - persistencePolicies = topicPolicies.getPersistence(); - retentionPolicies = topicPolicies.getRetentionPolicies(); - topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies(); + if (topicP.isPresent() && topicP.get().getPersistence() != null) { + persistencePolicies = topicP.get().getPersistence(); + } else if (globalTopicP.isPresent() && globalTopicP.get().getPersistence() != null) { + persistencePolicies = globalTopicP.get().getPersistence(); + } + if (topicP.isPresent() && topicP.get().getRetentionPolicies() != null) { + retentionPolicies = topicP.get().getRetentionPolicies(); + } else if (globalTopicP.isPresent() && globalTopicP.get().getRetentionPolicies() != null) { + retentionPolicies = globalTopicP.get().getRetentionPolicies(); + } + if (topicP.isPresent() && topicP.get().getOffloadPolicies() != null) { + topicLevelOffloadPolicies = topicP.get().getOffloadPolicies(); + } else if (globalTopicP.isPresent() && globalTopicP.get().getOffloadPolicies() != null) { + topicLevelOffloadPolicies = globalTopicP.get().getOffloadPolicies(); } if (persistencePolicies == null) { @@ -2057,6 +2068,13 @@ public class BrokerService implements Closeable { managedLedgerConfig.setNewEntriesCheckDelayInMillis( serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis()); return managedLedgerConfig; + }).exceptionally(ex -> { + final Throwable rc = FutureUtil.unwrapCompletionException(ex); + final String errorInfo = String.format("Topic creation encountered an exception by initialize" + + " topic policies service. topic_name=%s error_message=%s", topicName, + rc.getMessage()); + log.error(errorInfo, rc); + throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); }); } @@ -3214,7 +3232,7 @@ public class BrokerService implements Closeable { checkOwnershipAndCreatePersistentTopic(topic, true, pendingFuture, - pendingTopic.getProperties(), pendingTopic.getTopicPolicies()); + pendingTopic.getProperties()); pendingFuture.handle((persistentTopic, ex) -> { // release permit and process next pending topic if (acquiredPermit) { @@ -3821,6 +3839,5 @@ public class BrokerService implements Closeable { private final String topic; private final CompletableFuture<Optional<Topic>> topicFuture; private final Map<String, String> properties; - private final TopicPolicies topicPolicies; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 88a4e1d755a..50eba440747 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -56,6 +56,7 @@ import org.apache.pulsar.common.events.PulsarEvent; import org.apache.pulsar.common.events.TopicPoliciesEvent; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.util.FutureUtil; @@ -312,6 +313,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NotNull TopicName topicName, boolean isGlobal) { requireNonNull(topicName); + final var namespace = topicName.getNamespaceObject(); + if (NamespaceService.isHeartbeatNamespace(namespace) || isSelf(topicName)) { + return CompletableFuture.completedFuture(Optional.empty()); + } final CompletableFuture<Void> preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject()); return preparedFuture.thenApply(__ -> { final TopicPolicies candidatePolicies = isGlobal @@ -818,4 +823,13 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic readerCaches.clear(); } } + + private static boolean isSelf(TopicName topicName) { + final var localName = topicName.getLocalName(); + if (!topicName.isPartitioned()) { + return localName.equals(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); + } + final var index = localName.lastIndexOf(TopicName.PARTITIONED_TOPIC_SUFFIX); + return localName.substring(0, index).equals(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 22f4afe910a..bc9bf6288a3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -4129,10 +4129,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) { brokerService.getPulsar().getTopicPoliciesService() .registerListener(TopicName.getPartitionedTopicName(topic), this); - return CompletableFuture.completedFuture(null).thenRunAsync(() -> onUpdate( - brokerService.getPulsar().getTopicPoliciesService() - .getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))), - brokerService.getTopicOrderedExecutor()); + final var topicPoliciesService = brokerService.getPulsar().getTopicPoliciesService(); + final var partitionedTopicName = TopicName.getPartitionedTopicName(topic); + return topicPoliciesService.getTopicPoliciesAsync(partitionedTopicName, true) + .thenAcceptAsync(optionalPolicies -> optionalPolicies.ifPresent(this::onUpdate), + brokerService.getTopicOrderedExecutor()) + .thenCompose(__ -> topicPoliciesService.getTopicPoliciesAsync(partitionedTopicName, false)) + .thenAcceptAsync(optionalPolicies -> optionalPolicies.ifPresent(this::onUpdate), + brokerService.getTopicOrderedExecutor()); } return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 1d860fbc3fc..ade80f9b2ab 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -609,6 +609,114 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { admin.topics().delete(topic, false); } + @Test + public void testGlobalPolicyStillAffectsAfterUnloading() throws Exception { + // create topic and load it up. + final String namespace = myNamespace; + final String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp"); + final TopicName topicName = TopicName.get(topic); + admin.topics().createNonPartitionedTopic(topic); + pulsarClient.newProducer().topic(topic).create().close(); + final SystemTopicBasedTopicPoliciesService topicPoliciesService + = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); + + // Set non-global policy of the limitation of max consumers. + // Set global policy of the limitation of max producers. + admin.topicPolicies(false).setMaxConsumers(topic, 10); + admin.topicPolicies(true).setMaxProducers(topic, 20); + Awaitility.await().untilAsserted(() -> { + assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, false).join().get() + .getMaxConsumerPerTopic(), 10); + assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, true).join().get() + .getMaxProducerPerTopic(), 20); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopics().get(topic).get().get(); + HierarchyTopicPolicies hierarchyTopicPolicies = persistentTopic.getHierarchyTopicPolicies(); + assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10); + assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 20); + }); + + // Reload topic and verify: both global policy and non-global policy affect. + admin.topics().unload(topic); + pulsarClient.newProducer().topic(topic).create().close(); + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopics().get(topic).get().get(); + HierarchyTopicPolicies hierarchyTopicPolicies = persistentTopic.getHierarchyTopicPolicies(); + assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10); + assertEquals(hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 20); + }); + + // cleanup. + admin.topics().delete(topic, false); + } + + @Test + public void testRetentionGlobalPolicyAffects() throws Exception { + // create topic and load it up. + final String namespace = myNamespace; + final String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp"); + final TopicName topicName = TopicName.get(topic); + admin.topics().createNonPartitionedTopic(topic); + pulsarClient.newProducer().topic(topic).create().close(); + final SystemTopicBasedTopicPoliciesService topicPoliciesService + = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); + + // Set non-global policy of the limitation of max consumers. + // Set global policy of the persistence policies. + admin.topicPolicies(false).setMaxConsumers(topic, 10); + RetentionPolicies retentionPolicies = new RetentionPolicies(100, 200); + admin.topicPolicies(true).setRetention(topic, retentionPolicies); + Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, false).join().get() + .getMaxConsumerPerTopic(), 10); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopics().get(topic).get().get(); + HierarchyTopicPolicies hierarchyTopicPolicies = persistentTopic.getHierarchyTopicPolicies(); + assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10); + ManagedLedgerConfig mlConfig = persistentTopic.getManagedLedger().getConfig(); + assertEquals(mlConfig.getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(100)); + assertEquals(mlConfig.getRetentionSizeInMB(), 200); + }); + PersistencePolicies persistencePolicy = new PersistencePolicies(3, 2, 1, 4); + admin.topicPolicies(true).setPersistence(topic, persistencePolicy); + Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, false).join().get() + .getMaxConsumerPerTopic(), 10); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopics().get(topic).get().get(); + HierarchyTopicPolicies hierarchyTopicPolicies = persistentTopic.getHierarchyTopicPolicies(); + assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10); + ManagedLedgerConfig mlConfig = persistentTopic.getManagedLedger().getConfig(); + assertEquals(mlConfig.getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(100)); + assertEquals(mlConfig.getRetentionSizeInMB(), 200); + assertEquals(mlConfig.getEnsembleSize(), 3); + assertEquals(mlConfig.getWriteQuorumSize(), 2); + assertEquals(mlConfig.getAckQuorumSize(), 1); + assertEquals(mlConfig.getThrottleMarkDelete(), 4D); + }); + + // Reload topic and verify: retention policy of global policy affects. + admin.topics().unload(topic); + pulsarClient.newProducer().topic(topic).create().close(); + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopics().get(topic).get().get(); + HierarchyTopicPolicies hierarchyTopicPolicies = persistentTopic.getHierarchyTopicPolicies(); + ManagedLedgerConfig mlConfig = persistentTopic.getManagedLedger().getConfig(); + assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10); + assertEquals(mlConfig.getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(100)); + assertEquals(mlConfig.getRetentionSizeInMB(), 200); + assertEquals(mlConfig.getEnsembleSize(), 3); + assertEquals(mlConfig.getWriteQuorumSize(), 2); + assertEquals(mlConfig.getAckQuorumSize(), 1); + assertEquals(mlConfig.getThrottleMarkDelete(), 4D); + }); + + // cleanup. + admin.topics().delete(topic, false); + } + @Test(timeOut = 20000) public void testGetSizeBasedBacklogQuotaApplied() throws Exception { final String topic = testTopic + UUID.randomUUID(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 8407c8d8cef..9be0c67f199 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -1166,7 +1166,7 @@ public class BrokerServiceTest extends BrokerTestBase { // try to create topic which should fail as bundle is disable CompletableFuture<Optional<Topic>> futureResult = pulsar.getBrokerService() - .loadOrCreatePersistentTopic(topicName, true, null, null); + .loadOrCreatePersistentTopic(topicName, true, null); try { futureResult.get();