This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 6b84514bfab [fix][broker] Fix namespace bundle stuck in unloading
status (#21445) (#21565)
6b84514bfab is described below
commit 6b84514bfabcbe1d07d67c8a734fb238ce01e89c
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Nov 14 21:46:06 2023 +0800
[fix][broker] Fix namespace bundle stuck in unloading status (#21445)
(#21565)
Cherry-pick #21445 to branch-3.0
---
.../pulsar/broker/service/BrokerService.java | 372 +++++++++++----------
.../SystemTopicBasedTopicPoliciesService.java | 19 +-
.../pulsar/broker/admin/TopicAutoCreationTest.java | 8 +-
.../broker/namespace/NamespaceUnloadingTest.java | 29 ++
.../pulsar/broker/service/BrokerServiceTest.java | 2 +-
.../SystemTopicBasedTopicPoliciesServiceTest.java | 29 --
.../pulsar/client/cli/PulsarClientToolTest.java | 3 +
7 files changed, 247 insertions(+), 215 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2d51ef6ef30..4d02e7e8773 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -49,6 +49,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
@@ -70,6 +71,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
@@ -1051,19 +1053,32 @@ public class BrokerService implements Closeable {
}
final boolean isPersistentTopic =
topicName.getDomain().equals(TopicDomain.persistent);
if (isPersistentTopic) {
- return topics.computeIfAbsent(topicName.toString(), (tpName)
-> {
- if (topicName.isPartitioned()) {
- return
fetchPartitionedTopicMetadataAsync(TopicName.get(topicName.getPartitionedTopicName()))
- .thenCompose((metadata) -> {
- // Allow crate non-partitioned persistent
topic that name includes `partition`
- if (metadata.partitions == 0
- || topicName.getPartitionIndex() <
metadata.partitions) {
- return
loadOrCreatePersistentTopic(tpName, createIfMissing, properties);
- }
- return
CompletableFuture.completedFuture(Optional.empty());
- });
- }
- return loadOrCreatePersistentTopic(tpName,
createIfMissing, properties);
+ final CompletableFuture<Optional<TopicPolicies>>
topicPoliciesFuture =
+ getTopicPoliciesBypassSystemTopic(topicName);
+ return topicPoliciesFuture.exceptionally(ex -> {
+ final Throwable rc =
FutureUtil.unwrapCompletionException(ex);
+ final String errorInfo = String.format("Topic creation
encountered an exception by initialize"
+ + " topic policies service. topic_name=%s
error_message=%s", topicName, rc.getMessage());
+ log.error(errorInfo, rc);
+ throw FutureUtil.wrapToCompletionException(new
ServiceUnitNotReadyException(errorInfo));
+ }).thenCompose(optionalTopicPolicies -> {
+ final TopicPolicies topicPolicies =
optionalTopicPolicies.orElse(null);
+ return topics.computeIfAbsent(topicName.toString(),
(tpName) -> {
+ if (topicName.isPartitioned()) {
+ final TopicName topicNameEntity =
TopicName.get(topicName.getPartitionedTopicName());
+ return
fetchPartitionedTopicMetadataAsync(topicNameEntity)
+ .thenCompose((metadata) -> {
+ // Allow crate non-partitioned
persistent topic that name includes `partition`
+ if (metadata.partitions == 0
+ ||
topicName.getPartitionIndex() < metadata.partitions) {
+ return
loadOrCreatePersistentTopic(tpName, createIfMissing,
+ properties, topicPolicies);
+ }
+ return
CompletableFuture.completedFuture(Optional.empty());
+ });
+ }
+ return loadOrCreatePersistentTopic(tpName,
createIfMissing, properties, topicPolicies);
+ });
});
} else {
return topics.computeIfAbsent(topicName.toString(), (name) -> {
@@ -1117,6 +1132,18 @@ public class BrokerService implements Closeable {
}
}
+ private CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesBypassSystemTopic(@Nonnull TopicName topicName) {
+ Objects.requireNonNull(topicName);
+ final ServiceConfiguration serviceConfiguration =
pulsar.getConfiguration();
+ if (serviceConfiguration.isSystemTopicEnabled() &&
serviceConfiguration.isTopicLevelPoliciesEnabled()
+ &&
!NamespaceService.isSystemServiceNamespace(topicName.getNamespace())
+ &&
!SystemTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) {
+ return
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName);
+ } else {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+ }
+
public CompletableFuture<Void> deleteTopic(String topic, boolean
forceDelete) {
topicEventsDispatcher.notify(topic, TopicEvent.DELETE,
EventStage.BEFORE);
CompletableFuture<Void> result = deleteTopicInternal(topic,
forceDelete);
@@ -1521,7 +1548,7 @@ public class BrokerService implements Closeable {
* @throws RuntimeException
*/
protected CompletableFuture<Optional<Topic>>
loadOrCreatePersistentTopic(final String topic,
- boolean createIfMissing, Map<String, String> properties) throws
RuntimeException {
+ boolean createIfMissing, Map<String, String> properties, @Nullable
TopicPolicies topicPolicies) {
final CompletableFuture<Optional<Topic>> topicFuture =
FutureUtil.createFutureWithTimeout(
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()),
executor(),
() -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
@@ -1539,7 +1566,8 @@ public class BrokerService implements Closeable {
final Semaphore topicLoadSemaphore =
topicLoadRequestSemaphore.get();
if (topicLoadSemaphore.tryAcquire()) {
- checkOwnershipAndCreatePersistentTopic(topic,
createIfMissing, topicFuture, properties);
+ checkOwnershipAndCreatePersistentTopic(topic,
createIfMissing, topicFuture,
+ properties, topicPolicies);
topicFuture.handle((persistentTopic, ex) -> {
// release permit and process pending topic
topicLoadSemaphore.release();
@@ -1547,7 +1575,8 @@ public class BrokerService implements Closeable {
return null;
});
} else {
- pendingTopicLoadingQueue.add(new
TopicLoadingContext(topic, topicFuture, properties));
+ pendingTopicLoadingQueue.add(new
TopicLoadingContext(topic,
+ topicFuture, properties, topicPolicies));
if (log.isDebugEnabled()) {
log.debug("topic-loading for {} added into pending
queue", topic);
}
@@ -1588,7 +1617,7 @@ public class BrokerService implements Closeable {
private void checkOwnershipAndCreatePersistentTopic(final String topic,
boolean createIfMissing,
CompletableFuture<Optional<Topic>>
topicFuture,
- Map<String, String> properties) {
+ Map<String, String> properties,
@Nullable TopicPolicies topicPolicies) {
TopicName topicName = TopicName.get(topic);
pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
.thenAccept(isActive -> {
@@ -1602,7 +1631,8 @@ public class BrokerService implements Closeable {
}
propertiesFuture.thenAccept(finalProperties ->
//TODO add topicName in properties?
- createPersistentTopic(topic, createIfMissing,
topicFuture, finalProperties)
+ createPersistentTopic(topic, createIfMissing,
topicFuture,
+ finalProperties, topicPolicies)
).exceptionally(throwable -> {
log.warn("[{}] Read topic property failed", topic,
throwable);
pulsar.getExecutor().execute(() ->
topics.remove(topic, topicFuture));
@@ -1627,12 +1657,12 @@ public class BrokerService implements Closeable {
public void createPersistentTopic0(final String topic, boolean
createIfMissing,
CompletableFuture<Optional<Topic>>
topicFuture,
Map<String, String> properties) {
- createPersistentTopic(topic, createIfMissing, topicFuture, properties);
+ createPersistentTopic(topic, createIfMissing, topicFuture, properties,
null);
}
private void createPersistentTopic(final String topic, boolean
createIfMissing,
CompletableFuture<Optional<Topic>>
topicFuture,
- Map<String, String> properties) {
+ Map<String, String> properties,
@Nullable TopicPolicies topicPolicies) {
TopicName topicName = TopicName.get(topic);
final long topicCreateTimeMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
@@ -1648,7 +1678,8 @@ public class BrokerService implements Closeable {
? checkMaxTopicsPerNamespace(topicName, 1)
: CompletableFuture.completedFuture(null);
- maxTopicsCheck.thenCompose(__ ->
getManagedLedgerConfig(topicName)).thenAccept(managedLedgerConfig -> {
+ maxTopicsCheck.thenCompose(__ ->
+ getManagedLedgerConfig(topicName,
topicPolicies)).thenAccept(managedLedgerConfig -> {
if (isBrokerEntryMetadataEnabled() ||
isBrokerPayloadProcessorEnabled()) {
// init managedLedger interceptor
Set<BrokerEntryMetadataInterceptor> interceptors = new
HashSet<>();
@@ -1774,170 +1805,167 @@ public class BrokerService implements Closeable {
}
public CompletableFuture<ManagedLedgerConfig>
getManagedLedgerConfig(@Nonnull TopicName topicName) {
+ final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture =
+ getTopicPoliciesBypassSystemTopic(topicName);
+ return topicPoliciesFuture.thenCompose(optionalTopicPolicies ->
+ getManagedLedgerConfig(topicName,
optionalTopicPolicies.orElse(null)));
+ }
+
+ private CompletableFuture<ManagedLedgerConfig>
getManagedLedgerConfig(@Nonnull TopicName topicName,
+
@Nullable TopicPolicies topicPolicies) {
requireNonNull(topicName);
NamespaceName namespace = topicName.getNamespaceObject();
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
NamespaceResources nsr =
pulsar.getPulsarResources().getNamespaceResources();
LocalPoliciesResources lpr =
pulsar.getPulsarResources().getLocalPolicies();
- final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
- if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
- && !NamespaceService.isSystemServiceNamespace(namespace.toString())
- &&
!SystemTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) {
- topicPoliciesFuture =
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName);
- } else {
- topicPoliciesFuture =
CompletableFuture.completedFuture(Optional.empty());
- }
- return topicPoliciesFuture.thenCompose(topicPoliciesOptional -> {
- final CompletableFuture<Optional<Policies>> nsPolicies =
nsr.getPoliciesAsync(namespace);
- final CompletableFuture<Optional<LocalPolicies>> lcPolicies =
lpr.getLocalPoliciesAsync(namespace);
- return nsPolicies.thenCombine(lcPolicies, (policies,
localPolicies) -> {
- PersistencePolicies persistencePolicies = null;
- RetentionPolicies retentionPolicies = null;
- OffloadPoliciesImpl topicLevelOffloadPolicies = null;
- if (topicPoliciesOptional.isPresent()) {
- final TopicPolicies topicPolicies =
topicPoliciesOptional.get();
- persistencePolicies = topicPolicies.getPersistence();
- retentionPolicies = topicPolicies.getRetentionPolicies();
- topicLevelOffloadPolicies =
topicPolicies.getOffloadPolicies();
- }
-
- if (persistencePolicies == null) {
- persistencePolicies = policies.map(p ->
p.persistence).orElseGet(
- () -> new
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
-
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
-
serviceConfig.getManagedLedgerDefaultAckQuorum(),
-
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
- }
+ final CompletableFuture<Optional<Policies>> nsPolicies =
nsr.getPoliciesAsync(namespace);
+ final CompletableFuture<Optional<LocalPolicies>> lcPolicies =
lpr.getLocalPoliciesAsync(namespace);
+ return nsPolicies.thenCombine(lcPolicies, (policies, localPolicies) ->
{
+ PersistencePolicies persistencePolicies = null;
+ RetentionPolicies retentionPolicies = null;
+ OffloadPoliciesImpl topicLevelOffloadPolicies = null;
+ if (topicPolicies != null) {
+ persistencePolicies = topicPolicies.getPersistence();
+ retentionPolicies = topicPolicies.getRetentionPolicies();
+ topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
+ }
- if (retentionPolicies == null) {
- retentionPolicies = policies.map(p ->
p.retention_policies).orElseGet(
- () -> new
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
-
serviceConfig.getDefaultRetentionSizeInMB())
- );
- }
+ if (persistencePolicies == null) {
+ persistencePolicies = policies.map(p ->
p.persistence).orElseGet(
+ () -> new
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
+
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
+
serviceConfig.getManagedLedgerDefaultAckQuorum(),
+
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
+ }
- ManagedLedgerConfig managedLedgerConfig = new
ManagedLedgerConfig();
-
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
-
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
-
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+ if (retentionPolicies == null) {
+ retentionPolicies = policies.map(p ->
p.retention_policies).orElseGet(
+ () -> new
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+ serviceConfig.getDefaultRetentionSizeInMB())
+ );
+ }
- if (serviceConfig.isStrictBookieAffinityEnabled()) {
+ ManagedLedgerConfig managedLedgerConfig = new
ManagedLedgerConfig();
+
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
+
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
+
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+
+ if (serviceConfig.isStrictBookieAffinityEnabled()) {
+
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
+ IsolatedBookieEnsemblePlacementPolicy.class);
+ if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
+ Map<String, Object> properties = new HashMap<>();
+
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
+
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
+
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ } else if (isSystemTopic(topicName)) {
+ Map<String, Object> properties = new HashMap<>();
+
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
"*");
+ properties.put(IsolatedBookieEnsemblePlacementPolicy
+ .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
+
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ } else {
+ Map<String, Object> properties = new HashMap<>();
+
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
"");
+
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
"");
+
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ }
+ } else {
+ if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
IsolatedBookieEnsemblePlacementPolicy.class);
- if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
- Map<String, Object> properties = new HashMap<>();
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
- } else if (isSystemTopic(topicName)) {
- Map<String, Object> properties = new HashMap<>();
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
"*");
- properties.put(IsolatedBookieEnsemblePlacementPolicy
- .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
-
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
- } else {
- Map<String, Object> properties = new HashMap<>();
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
"");
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
"");
-
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
- }
- } else {
- if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
-
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
- IsolatedBookieEnsemblePlacementPolicy.class);
- Map<String, Object> properties = new HashMap<>();
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
-
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
-
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
- }
+ Map<String, Object> properties = new HashMap<>();
+
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
+
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
+
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
}
+ }
-
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
-
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
-
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
-
- managedLedgerConfig
-
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
-
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
-
serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
-
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
-
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
-
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
- managedLedgerConfig
-
.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
- managedLedgerConfig
-
.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
-
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
-
- managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
-
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
- managedLedgerConfig
-
.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
- managedLedgerConfig
-
.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
-
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
- managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
-
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
-
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
-
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
- managedLedgerConfig
-
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
-
- managedLedgerConfig
-
.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
- managedLedgerConfig
-
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(),
TimeUnit.MINUTES);
-
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
-
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
-
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
- managedLedgerConfig.setInactiveLedgerRollOverTime(
-
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(),
TimeUnit.SECONDS);
- managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
- serviceConfig.isCacheEvictionByMarkDeletedPosition());
- managedLedgerConfig.setMinimumBacklogCursorsForCaching(
-
serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
- managedLedgerConfig.setMinimumBacklogEntriesForCaching(
-
serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
- managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
-
serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
-
- OffloadPoliciesImpl nsLevelOffloadPolicies =
- (OffloadPoliciesImpl) policies.map(p ->
p.offload_policies).orElse(null);
- OffloadPoliciesImpl offloadPolicies =
OffloadPoliciesImpl.mergeConfiguration(
- topicLevelOffloadPolicies,
-
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies,
policies.orElse(null)),
- getPulsar().getConfig().getProperties());
- if
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
-
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
- } else {
- if (topicLevelOffloadPolicies != null) {
- try {
- LedgerOffloader topicLevelLedgerOffLoader =
-
pulsar().createManagedLedgerOffloader(offloadPolicies);
-
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
- } catch (PulsarServerException e) {
- throw new RuntimeException(e);
- }
- } else {
- //If the topic level policy is null, use the namespace
level
- managedLedgerConfig
-
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace,
offloadPolicies));
+
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
+
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+
+ managedLedgerConfig
+
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
+
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
+
serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
+ managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
+
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
+
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
+ managedLedgerConfig
+
.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+ managedLedgerConfig
+
.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+
+ managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
+ managedLedgerConfig
+
.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+ managedLedgerConfig
+
.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
+
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
+ managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
+
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
+
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
+
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
+ managedLedgerConfig
+
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
+
+ managedLedgerConfig
+
.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
+ managedLedgerConfig
+
.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(),
TimeUnit.MINUTES);
+
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
+
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+ managedLedgerConfig.setInactiveLedgerRollOverTime(
+
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(),
TimeUnit.SECONDS);
+ managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
+ serviceConfig.isCacheEvictionByMarkDeletedPosition());
+ managedLedgerConfig.setMinimumBacklogCursorsForCaching(
+
serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
+ managedLedgerConfig.setMinimumBacklogEntriesForCaching(
+
serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
+ managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
+
serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
+
+ OffloadPoliciesImpl nsLevelOffloadPolicies =
+ (OffloadPoliciesImpl) policies.map(p ->
p.offload_policies).orElse(null);
+ OffloadPoliciesImpl offloadPolicies =
OffloadPoliciesImpl.mergeConfiguration(
+ topicLevelOffloadPolicies,
+
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies,
policies.orElse(null)),
+ getPulsar().getConfig().getProperties());
+ if
(NamespaceService.isSystemServiceNamespace(namespace.toString())) {
+
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
+ } else {
+ if (topicLevelOffloadPolicies != null) {
+ try {
+ LedgerOffloader topicLevelLedgerOffLoader =
+
pulsar().createManagedLedgerOffloader(offloadPolicies);
+
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+ } catch (PulsarServerException e) {
+ throw new RuntimeException(e);
}
+ } else {
+ //If the topic level policy is null, use the namespace
level
+ managedLedgerConfig
+
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace,
offloadPolicies));
}
+ }
- managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
-
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
- managedLedgerConfig.setNewEntriesCheckDelayInMillis(
-
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
- return managedLedgerConfig;
- });
+ managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
+ serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+ managedLedgerConfig.setNewEntriesCheckDelayInMillis(
+
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+ return managedLedgerConfig;
});
}
@@ -3043,7 +3071,10 @@ public class BrokerService implements Closeable {
CompletableFuture<Optional<Topic>> pendingFuture =
pendingTopic.getTopicFuture();
final Semaphore topicLoadSemaphore =
topicLoadRequestSemaphore.get();
final boolean acquiredPermit = topicLoadSemaphore.tryAcquire();
- checkOwnershipAndCreatePersistentTopic(topic, true, pendingFuture,
pendingTopic.getProperties());
+ checkOwnershipAndCreatePersistentTopic(topic,
+ true,
+ pendingFuture,
+ pendingTopic.getProperties(),
pendingTopic.getTopicPolicies());
pendingFuture.handle((persistentTopic, ex) -> {
// release permit and process next pending topic
if (acquiredPermit) {
@@ -3614,5 +3645,6 @@ public class BrokerService implements Closeable {
private final String topic;
private final CompletableFuture<Optional<Topic>> topicFuture;
private final Map<String, String> properties;
+ private final TopicPolicies topicPolicies;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index da312340954..80fecbe67b6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -29,7 +29,6 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.tuple.MutablePair;
@@ -43,10 +42,8 @@ import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
-import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.PulsarEvent;
@@ -320,7 +317,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
requireNonNull(namespace);
return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
- createSystemTopicClientWithRetry(namespace);
+ createSystemTopicClient(namespace);
readerCaches.put(namespace, readerCompletableFuture);
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new
AtomicInteger(1));
final CompletableFuture<Void> initFuture = readerCompletableFuture
@@ -346,20 +343,16 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
});
}
- protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
createSystemTopicClientWithRetry(
+ protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
createSystemTopicClient(
NamespaceName namespace) {
- CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new
CompletableFuture<>();
try {
createSystemTopicFactoryIfNeeded();
- } catch (PulsarServerException e) {
- result.completeExceptionally(e);
- return result;
+ } catch (PulsarServerException ex) {
+ return FutureUtil.failedFuture(ex);
}
- SystemTopicClient<PulsarEvent> systemTopicClient =
namespaceEventsSystemTopicFactory
+ final SystemTopicClient<PulsarEvent> systemTopicClient =
namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(namespace);
- Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3,
TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
- RetryUtil.retryAsynchronously(systemTopicClient::newReaderAsync,
backoff, pulsarService.getExecutor(), result);
- return result;
+ return systemTopicClient.newReaderAsync();
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index d447787d1b7..590edc2d3f3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -135,6 +135,8 @@ public class TopicAutoCreationTest extends
ProducerConsumerBase {
new InetSocketAddress(pulsar.getAdvertisedAddress(),
pulsar.getBrokerListenPort().get());
return
CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
});
+ final String topicPoliciesServiceInitException
+ = "Topic creation encountered an exception by initialize
topic policies service";
// Creating a producer and creating a Consumer may trigger
automatic topic
// creation, let's try to create a Producer and a Consumer
@@ -145,7 +147,8 @@ public class TopicAutoCreationTest extends
ProducerConsumerBase {
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by
this instance";
log.info("Expected error", expected);
- assertTrue(expected.getMessage().contains(String.format(msg,
topic)));
+ assertTrue(expected.getMessage().contains(String.format(msg,
topic))
+ ||
expected.getMessage().contains(topicPoliciesServiceInitException));
}
try (Consumer<byte[]> ignored = pulsarClient.newConsumer()
@@ -155,7 +158,8 @@ public class TopicAutoCreationTest extends
ProducerConsumerBase {
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by
this instance";
log.info("Expected error", expected);
- assertTrue(expected.getMessage().contains(String.format(msg,
topic)));
+ assertTrue(expected.getMessage().contains(String.format(msg,
topic))
+ ||
expected.getMessage().contains(topicPoliciesServiceInitException));
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceUnloadingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceUnloadingTest.java
index 0dbfe176087..1526611874a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceUnloadingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceUnloadingTest.java
@@ -22,8 +22,12 @@ import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
+import lombok.Cleanup;
import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -34,6 +38,9 @@ public class NamespaceUnloadingTest extends BrokerTestBase {
@BeforeMethod
@Override
protected void setup() throws Exception {
+ conf.setTopicLevelPoliciesEnabled(true);
+ conf.setForceDeleteNamespaceAllowed(true);
+ conf.setTopicLoadTimeoutSeconds(Integer.MAX_VALUE);
super.baseSetup();
}
@@ -68,4 +75,26 @@ public class NamespaceUnloadingTest extends BrokerTestBase {
producer.close();
}
+ @Test
+ public void testUnloadWithTopicCreation() throws PulsarAdminException,
PulsarClientException {
+ final String namespaceName = "prop/ns_unloading";
+ final String topicName =
"persistent://prop/ns_unloading/with_topic_creation";
+ final int partitions = 5;
+ admin.namespaces().createNamespace(namespaceName, 1);
+ admin.topics().createPartitionedTopic(topicName, partitions);
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topicName)
+ .create();
+
+ for (int i = 0; i < 100; i++) {
+ admin.namespaces().unloadNamespaceBundle(namespaceName,
"0x00000000_0xffffffff");
+ }
+
+ for (int i = 0; i < partitions; i++) {
+ producer.send(i);
+ }
+ admin.namespaces().deleteNamespace(namespaceName, true);
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 2489aa5f026..57bf3a2a866 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1152,7 +1152,7 @@ public class BrokerServiceTest extends BrokerTestBase {
// try to create topic which should fail as bundle is disable
CompletableFuture<Optional<Topic>> futureResult =
pulsar.getBrokerService()
- .loadOrCreatePersistentTopic(topicName, true, null);
+ .loadOrCreatePersistentTopic(topicName, true, null, null);
try {
futureResult.get();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 5b70ff99675..ba5e42867d3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -18,9 +18,6 @@
*/
package org.apache.pulsar.broker.service;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
@@ -43,11 +40,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
-import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
-import org.apache.pulsar.broker.systopic.TopicPoliciesSystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.common.events.PulsarEvent;
@@ -56,7 +50,6 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
-import org.apache.pulsar.common.util.FutureUtil;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
@@ -321,28 +314,6 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
assertTrue("actual:" + cost, cost >= 5000 - 1000);
}
- @Test
- public void testCreatSystemTopicClientWithRetry() throws Exception {
- SystemTopicBasedTopicPoliciesService service =
- spy((SystemTopicBasedTopicPoliciesService)
pulsar.getTopicPoliciesService());
- Field field = SystemTopicBasedTopicPoliciesService.class
- .getDeclaredField("namespaceEventsSystemTopicFactory");
- field.setAccessible(true);
- NamespaceEventsSystemTopicFactory factory =
spy((NamespaceEventsSystemTopicFactory) field.get(service));
- SystemTopicClient<PulsarEvent> client =
mock(TopicPoliciesSystemTopicClient.class);
-
doReturn(client).when(factory).createTopicPoliciesSystemTopicClient(any());
- field.set(service, factory);
-
- SystemTopicClient.Reader<PulsarEvent> reader =
mock(SystemTopicClient.Reader.class);
- // Throw an exception first, create successfully after retrying
- doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
-
.doReturn(CompletableFuture.completedFuture(reader)).when(client).newReaderAsync();
-
- SystemTopicClient.Reader<PulsarEvent> reader1 =
service.createSystemTopicClientWithRetry(null).get();
-
- assertEquals(reader1, reader);
- }
-
@Test
public void testGetTopicPoliciesWithRetry() throws Exception {
Field initMapField =
SystemTopicBasedTopicPoliciesService.class.getDeclaredField("policyCacheInitMap");
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index 8d416125fd1..acda1b6f6a5 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -111,6 +111,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
properties.setProperty("useTls", "false");
final String topicName = getTopicWithRandomSuffix("non-durable");
+ admin.topics().createNonPartitionedTopic(topicName);
int numberOfMessages = 10;
@Cleanup("shutdownNow")
@@ -202,6 +203,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
properties.setProperty("useTls", "false");
final String topicName = getTopicWithRandomSuffix("reader");
+ admin.topics().createNonPartitionedTopic(topicName);
int numberOfMessages = 10;
@Cleanup("shutdownNow")
@@ -251,6 +253,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
properties.setProperty("useTls", "false");
final String topicName = getTopicWithRandomSuffix("encryption");
+ admin.topics().createNonPartitionedTopic(topicName);
final String keyUriBase =
"file:../pulsar-broker/src/test/resources/certificate/";
final int numberOfMessages = 10;