This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new ec2f17aca56 [fix][broker][branch-2.10] Fix inconsistent topic policy
(#21258)
ec2f17aca56 is described below
commit ec2f17aca565f102762ed3300c0ff393a31ff928
Author: Qiang Zhao <[email protected]>
AuthorDate: Sat Oct 7 14:36:44 2023 +0800
[fix][broker][branch-2.10] Fix inconsistent topic policy (#21258)
---
.../pulsar/broker/service/BrokerService.java | 239 +++++++++++----------
.../SystemTopicBasedTopicPoliciesService.java | 99 ++++++---
.../broker/service/TopicPoliciesService.java | 40 ++++
.../pulsar/broker/admin/TopicPoliciesTest.java | 2 +-
.../admin/TopicPoliciesWithBrokerRestartTest.java | 112 ++++++++++
.../SystemTopicBasedTopicPoliciesServiceTest.java | 6 +-
.../pulsar/client/api/BrokerServiceLookupTest.java | 4 +-
7 files changed, 352 insertions(+), 150 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 08550886ecb..b028bfa3e4f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -150,6 +150,7 @@ import
org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
@@ -1577,137 +1578,141 @@ public class BrokerService implements Closeable {
});
}
- public CompletableFuture<ManagedLedgerConfig>
getManagedLedgerConfig(TopicName topicName) {
+ public CompletableFuture<ManagedLedgerConfig>
getManagedLedgerConfig(@Nonnull TopicName topicName) {
+ requireNonNull(topicName);
NamespaceName namespace = topicName.getNamespaceObject();
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
NamespaceResources nsr =
pulsar.getPulsarResources().getNamespaceResources();
LocalPoliciesResources lpr =
pulsar.getPulsarResources().getLocalPolicies();
- return nsr.getPoliciesAsync(namespace)
- .thenCombine(lpr.getLocalPoliciesAsync(namespace), (policies,
localPolicies) -> {
- PersistencePolicies persistencePolicies = null;
- RetentionPolicies retentionPolicies = null;
- OffloadPoliciesImpl topicLevelOffloadPolicies = null;
-
- if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
- &&
!NamespaceService.isSystemServiceNamespace(namespace.toString())) {
- try {
- TopicPolicies topicPolicies =
pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
- if (topicPolicies != null) {
- persistencePolicies =
topicPolicies.getPersistence();
- retentionPolicies =
topicPolicies.getRetentionPolicies();
- topicLevelOffloadPolicies =
topicPolicies.getOffloadPolicies();
- }
- } catch
(BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.debug("Topic {} policies have not been
initialized yet.", topicName);
- }
- }
-
- if (persistencePolicies == null) {
- persistencePolicies = policies.map(p ->
p.persistence).orElseGet(
- () -> new
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
-
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
-
serviceConfig.getManagedLedgerDefaultAckQuorum(),
-
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
- }
-
- if (retentionPolicies == null) {
- retentionPolicies = policies.map(p ->
p.retention_policies).orElseGet(
- () -> new
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
-
serviceConfig.getDefaultRetentionSizeInMB())
- );
- }
-
+ final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
+ if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
+ && !NamespaceService.isSystemServiceNamespace(namespace.toString())
+ &&
!EventsTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) {
+ topicPoliciesFuture =
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName);
+ } else {
+ topicPoliciesFuture =
CompletableFuture.completedFuture(Optional.empty());
+ }
+ return topicPoliciesFuture.thenCompose(topicPoliciesOptional -> {
+ final CompletableFuture<Optional<Policies>> nsPolicies =
nsr.getPoliciesAsync(namespace);
+ final CompletableFuture<Optional<LocalPolicies>> lcPolicies =
lpr.getLocalPoliciesAsync(namespace);
+ return nsPolicies.thenCombine(lcPolicies, (policies,
localPolicies) -> {
+ PersistencePolicies persistencePolicies = null;
+ RetentionPolicies retentionPolicies = null;
+ OffloadPoliciesImpl topicLevelOffloadPolicies = null;
+ if (topicPoliciesOptional.isPresent()) {
+ final TopicPolicies topicPolicies =
topicPoliciesOptional.get();
+ persistencePolicies = topicPolicies.getPersistence();
+ retentionPolicies = topicPolicies.getRetentionPolicies();
+ topicLevelOffloadPolicies =
topicPolicies.getOffloadPolicies();
+ }
- ManagedLedgerConfig managedLedgerConfig = new
ManagedLedgerConfig();
-
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
-
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
-
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
- if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
- managedLedgerConfig
- .setBookKeeperEnsemblePlacementPolicyClassName(
-
IsolatedBookieEnsemblePlacementPolicy.class);
- Map<String, Object> properties = Maps.newHashMap();
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
- }
-
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
-
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
-
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+ if (persistencePolicies == null) {
+ persistencePolicies = policies.map(p ->
p.persistence).orElseGet(
+ () -> new
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
+
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
+
serviceConfig.getManagedLedgerDefaultAckQuorum(),
+
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
+ }
- managedLedgerConfig
-
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
- managedLedgerConfig.setMaxUnackedRangesToPersistInZk(
-
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
-
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
- managedLedgerConfig
-
.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
- managedLedgerConfig
-
.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
-
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+ if (retentionPolicies == null) {
+ retentionPolicies = policies.map(p ->
p.retention_policies).orElseGet(
+ () -> new
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+
serviceConfig.getDefaultRetentionSizeInMB())
+ );
+ }
- managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
-
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
- managedLedgerConfig
-
.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
- managedLedgerConfig
-
.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
-
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
- managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
-
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
-
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
-
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
- managedLedgerConfig
-
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
+ ManagedLedgerConfig managedLedgerConfig = new
ManagedLedgerConfig();
+
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
+
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
+
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+ if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
managedLedgerConfig
-
.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
- managedLedgerConfig
-
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(),
TimeUnit.MINUTES);
-
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
-
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
-
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
- managedLedgerConfig.setInactiveLedgerRollOverTime(
-
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(),
TimeUnit.SECONDS);
- managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
-
serviceConfig.isCacheEvictionByMarkDeletedPosition());
-
- OffloadPoliciesImpl nsLevelOffloadPolicies =
- (OffloadPoliciesImpl) policies.map(p ->
p.offload_policies).orElse(null);
- OffloadPoliciesImpl offloadPolicies =
OffloadPoliciesImpl.mergeConfiguration(
- topicLevelOffloadPolicies,
-
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies,
policies.orElse(null)),
- getPulsar().getConfig().getProperties());
- if
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
-
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
- } else {
- if (topicLevelOffloadPolicies != null) {
- try {
- LedgerOffloader topicLevelLedgerOffLoader =
-
pulsar().createManagedLedgerOffloader(offloadPolicies);
-
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
- } catch (PulsarServerException e) {
- throw new RuntimeException(e);
- }
- } else {
- //If the topic level policy is null, use the
namespace level
- managedLedgerConfig
-
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace,
offloadPolicies));
+ .setBookKeeperEnsemblePlacementPolicyClassName(
+
IsolatedBookieEnsemblePlacementPolicy.class);
+ Map<String, Object> properties = Maps.newHashMap();
+
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
+
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
+
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ }
+
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
+
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+
+ managedLedgerConfig
+
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
+ managedLedgerConfig.setMaxUnackedRangesToPersistInZk(
+
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
+
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
+ managedLedgerConfig
+
.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+ managedLedgerConfig
+
.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+
+ managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
+ managedLedgerConfig
+
.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+ managedLedgerConfig
+
.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
+
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
+ managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
+
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
+
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
+
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
+ managedLedgerConfig
+
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
+
+ managedLedgerConfig
+
.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
+ managedLedgerConfig
+
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(),
TimeUnit.MINUTES);
+
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
+
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+ managedLedgerConfig.setInactiveLedgerRollOverTime(
+
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(),
TimeUnit.SECONDS);
+ managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
+ serviceConfig.isCacheEvictionByMarkDeletedPosition());
+
+ OffloadPoliciesImpl nsLevelOffloadPolicies =
+ (OffloadPoliciesImpl) policies.map(p ->
p.offload_policies).orElse(null);
+ OffloadPoliciesImpl offloadPolicies =
OffloadPoliciesImpl.mergeConfiguration(
+ topicLevelOffloadPolicies,
+
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies,
policies.orElse(null)),
+ getPulsar().getConfig().getProperties());
+ if
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
+
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
+ } else {
+ if (topicLevelOffloadPolicies != null) {
+ try {
+ LedgerOffloader topicLevelLedgerOffLoader =
+
pulsar().createManagedLedgerOffloader(offloadPolicies);
+
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+ } catch (PulsarServerException e) {
+ throw new RuntimeException(e);
}
+ } else {
+ //If the topic level policy is null, use the namespace
level
+ managedLedgerConfig
+
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace,
offloadPolicies));
}
+ }
- managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
-
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
- managedLedgerConfig.setNewEntriesCheckDelayInMillis(
-
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+ managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
+
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+ managedLedgerConfig.setNewEntriesCheckDelayInMillis(
+
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
- return managedLedgerConfig;
- });
+ return managedLedgerConfig;
+ });
+ });
}
private void addTopicToStatsMaps(TopicName topicName, Topic topic) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 419c6d88b22..daa2b249a9c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
+import static java.util.Objects.requireNonNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -25,6 +26,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -78,8 +80,8 @@ public class SystemTopicBasedTopicPoliciesService implements
TopicPoliciesServic
private final Map<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
readerCaches = new ConcurrentHashMap<>();
- @VisibleForTesting
- final Map<NamespaceName, Boolean> policyCacheInitMap = new
ConcurrentHashMap<>();
+
+ final Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap = new
ConcurrentHashMap<>();
@VisibleForTesting
final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners =
new ConcurrentHashMap<>();
@@ -219,12 +221,12 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
boolean isGlobal) throws
TopicPoliciesCacheNotInitException {
if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
NamespaceName namespace = topicName.getNamespaceObject();
- prepareInitPoliciesCache(namespace, new CompletableFuture<>());
+ prepareInitPoliciesCacheAsync(namespace);
}
MutablePair<TopicPoliciesCacheNotInitException, TopicPolicies> result
= new MutablePair<>();
policyCacheInitMap.compute(topicName.getNamespaceObject(), (k,
initialized) -> {
- if (initialized == null || !initialized) {
+ if (initialized == null || !initialized.isDone()) {
result.setLeft(new TopicPoliciesCacheNotInitException());
} else {
TopicPolicies topicPolicies =
@@ -242,6 +244,34 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
}
+ @Nonnull
+ @Override
+ public CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesAsync(@Nonnull TopicName topicName,
+
boolean isGlobal) {
+ requireNonNull(topicName);
+ final CompletableFuture<Void> preparedFuture =
prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
+ return preparedFuture.thenApply(__ -> {
+ final TopicPolicies candidatePolicies = isGlobal
+ ?
globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))
+ :
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
+ return Optional.ofNullable(candidatePolicies);
+ });
+ }
+
+ @Nonnull
+ @Override
+ public CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesAsync(@Nonnull TopicName topicName) {
+ requireNonNull(topicName);
+ final CompletableFuture<Void> preparedFuture =
prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
+ return preparedFuture.thenApply(__ -> {
+ final TopicPolicies localPolicies =
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
+ if (localPolicies != null) {
+ return Optional.of(localPolicies);
+ }
+ return
Optional.ofNullable(globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName())));
+ });
+ }
+
@Override
public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
return
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
@@ -265,40 +295,49 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
@Override
public CompletableFuture<Void>
addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
- CompletableFuture<Void> result = new CompletableFuture<>();
NamespaceName namespace = namespaceBundle.getNamespaceObject();
if (NamespaceService.checkHeartbeatNamespace(namespace) != null
|| NamespaceService.checkHeartbeatNamespaceV2(namespace) !=
null) {
- result.complete(null);
- return result;
+ return CompletableFuture.completedFuture(null);
}
synchronized (this) {
if (readerCaches.get(namespace) != null) {
ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
- result.complete(null);
+ return CompletableFuture.completedFuture(null);
} else {
- prepareInitPoliciesCache(namespace, result);
+ return prepareInitPoliciesCacheAsync(namespace);
}
}
- return result;
}
- private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace,
CompletableFuture<Void> result) {
- if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
- CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
+ private @Nonnull CompletableFuture<Void>
prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
+ requireNonNull(namespace);
+ return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
+ final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
createSystemTopicClientWithRetry(namespace);
readerCaches.put(namespace, readerCompletableFuture);
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new
AtomicInteger(1));
- readerCompletableFuture.thenAccept(reader -> {
- initPolicesCache(reader, result);
- result.thenRun(() -> readMorePolicies(reader));
- }).exceptionally(ex -> {
- log.error("[{}] Failed to create reader on __change_events
topic", namespace, ex);
- cleanCacheAndCloseReader(namespace, false);
- result.completeExceptionally(ex);
+ final CompletableFuture<Void> initFuture = readerCompletableFuture
+ .thenCompose(reader -> {
+ final CompletableFuture<Void> stageFuture = new
CompletableFuture<>();
+ initPolicesCache(reader, stageFuture);
+ return stageFuture
+ // Read policies in background
+ .thenAccept(__ ->
readMorePoliciesAsync(reader));
+ });
+ initFuture.exceptionally(ex -> {
+ try {
+ log.error("[{}] Failed to create reader on __change_events
topic", namespace, ex);
+ cleanCacheAndCloseReader(namespace, false);
+ } catch (Throwable cleanupEx) {
+ // Adding this catch to avoid break callback chain
+ log.error("[{}] Failed to cleanup reader on
__change_events topic", namespace, cleanupEx);
+ }
return null;
});
- }
+ // let caller know we've got an exception.
+ return initFuture;
+ });
}
protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
createSystemTopicClientWithRetry(
@@ -382,8 +421,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
if (log.isDebugEnabled()) {
log.debug("[{}] Reach the end of the system topic.",
reader.getSystemTopic().getTopicName());
}
- policyCacheInitMap.computeIfPresent(
-
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
+
// replay policy message
policiesCache.forEach(((topicName, topicPolicies) -> {
if (listeners.get(topicName) != null) {
@@ -396,6 +434,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
}
}));
+
future.complete(null);
}
});
@@ -421,7 +460,13 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
});
}
- private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent>
reader) {
+ /**
+ * This is an async method for the background reader to continue syncing
new messages.
+ *
+ * Note: You should not do any blocking call here. because it will affect
+ * #{@link
SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync(TopicName)} method
to block loading topic.
+ */
+ private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent>
reader) {
reader.readNextAsync()
.thenAccept(msg -> {
refreshTopicPoliciesCache(msg);
@@ -429,7 +474,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
})
.whenComplete((__, ex) -> {
if (ex == null) {
- readMorePolicies(reader);
+ readMorePoliciesAsync(reader);
} else {
Throwable cause =
FutureUtil.unwrapCompletionException(ex);
if (cause instanceof
PulsarClientException.AlreadyClosedException) {
@@ -438,7 +483,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
} else {
log.warn("Read more topic polices exception, read
again.", ex);
- readMorePolicies(reader);
+ readMorePoliciesAsync(reader);
}
}
});
@@ -591,7 +636,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
@VisibleForTesting
- public Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
+ public CompletableFuture<Void> getPoliciesCacheInit(NamespaceName
namespaceName) {
return policyCacheInitMap.get(namespaceName);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 5b2aa6e8ce9..98c8e71bee0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -22,6 +22,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
@@ -109,6 +110,32 @@ public interface TopicPoliciesService {
return response;
}
+ /**
+ * Asynchronously retrieves topic policies.
+ * This triggers the Pulsar broker's internal client to load policies from
the
+ * system topic `persistent://tenant/namespace/__change_event`.
+ *
+ * @param topicName The name of the topic.
+ * @param isGlobal Indicates if the policies are global.
+ * @return A CompletableFuture containing an Optional of TopicPolicies.
+ * @throws NullPointerException If the topicName is null.
+ */
+ @Nonnull
+ CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull
TopicName topicName, boolean isGlobal);
+
+ /**
+ * Asynchronously retrieves topic policies.
+ * This triggers the Pulsar broker's internal client to load policies from
the
+ * system topic `persistent://tenant/namespace/__change_event`.
+ *
+ * NOTE: If local policies are not available, it will fallback to using
topic global policies.
+ * @param topicName The name of the topic.
+ * @return A CompletableFuture containing an Optional of TopicPolicies.
+ * @throws NullPointerException If the topicName is null.
+ */
+ @Nonnull
+ CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull
TopicName topicName);
+
/**
* Get policies for a topic without cache async.
* @param topicName topic name
@@ -162,6 +189,19 @@ public interface TopicPoliciesService {
return null;
}
+ @Nonnull
+ @Override
+ public CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesAsync(@Nonnull TopicName topicName,
+
boolean isGlobal) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+
+ @Nonnull
+ @Override
+ public CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesAsync(@Nonnull TopicName topicName) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+
@Override
public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
return null;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index f3e3188de4b..adfa95e8f4d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -169,7 +169,7 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
//make sure namespace policy reader is fully started.
Awaitility.await().untilAsserted(()-> {
-
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()));
+
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()).isDone());
});
//load the topic.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
new file mode 100644
index 00000000000..f39981eec48
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@Test(groups = "broker-admin")
+public class TopicPoliciesWithBrokerRestartTest extends
MockedPulsarServiceBaseTest {
+
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ conf.setSystemTopicEnabled(true);
+ conf.setTopicLevelPoliciesEnabled(true);
+ }
+
+ @Override
+ @BeforeClass(alwaysRun = true)
+ protected void setup() throws Exception {
+ super.internalSetup();
+ setupDefaultTenantAndNamespace();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+
+ @Test
+ public void testRetentionWithBrokerRestart() throws Exception {
+ final int messages = 1_000;
+ final int topicNum = 500;
+ // (1) Init topic
+ admin.namespaces().createNamespace("public/retention");
+ final String topicName =
"persistent://public/retention/retention_with_broker_restart";
+ admin.topics().createNonPartitionedTopic(topicName);
+ for (int i = 0; i < topicNum; i++) {
+ final String shadowTopicNames = topicName + "_" + i;
+ admin.topics().createNonPartitionedTopic(shadowTopicNames);
+ }
+ // (2) Set retention
+ final RetentionPolicies retentionPolicies = new RetentionPolicies(20,
20);
+ for (int i = 0; i < topicNum; i++) {
+ final String shadowTopicNames = topicName + "_" + i;
+ admin.topicPolicies().setRetention(shadowTopicNames,
retentionPolicies);
+ }
+ admin.topicPolicies().setRetention(topicName, retentionPolicies);
+ // (3) Send messages
+ @Cleanup
+ final Producer<byte[]> publisher = pulsarClient.newProducer()
+ .topic(topicName)
+ .create();
+ for (int i = 0; i < messages; i++) {
+ publisher.send((i + "").getBytes(StandardCharsets.UTF_8));
+ }
+ // (4) Check configuration
+ Awaitility.await().untilAsserted(() -> {
+ final PersistentTopic persistentTopic1 = (PersistentTopic)
+ pulsar.getBrokerService().getTopic(topicName,
true).join().get();
+ final ManagedLedgerImpl managedLedger1 = (ManagedLedgerImpl)
persistentTopic1.getManagedLedger();
+
Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(), 20);
+
Assert.assertEquals(managedLedger1.getConfig().getRetentionTimeMillis(),
+ TimeUnit.MINUTES.toMillis(20));
+ });
+ // (5) Restart broker
+ restartBroker();
+ // (6) Check configuration again
+ for (int i = 0; i < topicNum; i++) {
+ final String shadowTopicNames = topicName + "_" + i;
+ admin.lookups().lookupTopic(shadowTopicNames);
+ final PersistentTopic persistentTopicTmp = (PersistentTopic)
+ pulsar.getBrokerService().getTopic(shadowTopicNames,
true).join().get();
+ final ManagedLedgerImpl managedLedgerTemp = (ManagedLedgerImpl)
persistentTopicTmp.getManagedLedger();
+
Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionSizeInMB(), 20);
+
Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionTimeMillis(),
+ TimeUnit.MINUTES.toMillis(20));
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index c90c4a3a5bc..e69230b2d2d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -142,7 +142,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
// Wait for all topic policies updated.
Awaitility.await().untilAsserted(() ->
Assert.assertTrue(systemTopicBasedTopicPoliciesService
- .getPoliciesCacheInit(TOPIC1.getNamespaceObject())));
+
.getPoliciesCacheInit(TOPIC1.getNamespaceObject()).isDone()));
// Assert broker is cache all topic policies
Awaitility.await().untilAsserted(() ->
@@ -305,8 +305,8 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
@Test
public void testGetPolicyTimeout() throws Exception {
SystemTopicBasedTopicPoliciesService service =
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
- Awaitility.await().untilAsserted(() ->
assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject())));
- service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), false);
+ Awaitility.await().untilAsserted(() ->
assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject()).isDone()));
+ service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), new
CompletableFuture<>());
long start = System.currentTimeMillis();
Backoff backoff = new BackoffBuilder()
.setInitialTime(500, TimeUnit.MILLISECONDS)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 3b018e70481..dac7f4ab2aa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -923,9 +923,9 @@ public class BrokerServiceLookupTest extends
ProducerConsumerBase {
}
private int calculateLookupRequestCount() throws Exception {
- int failures =
CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_failures_total")
+ int failures =
CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_failures")
.intValue();
- int answers =
CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_answers_total")
+ int answers =
CollectorRegistry.defaultRegistry.getSampleValue("pulsar_broker_lookup_answers")
.intValue();
return failures + answers;
}