This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2e3e78d992e [cleanup] PIP-457: Remove NamespaceName.isGlobal() and
TopicName.isGlobal() (#25319)
2e3e78d992e is described below
commit 2e3e78d992e873e358d16fa875f79cb607019284
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Mar 15 23:58:25 2026 -0700
[cleanup] PIP-457: Remove NamespaceName.isGlobal() and TopicName.isGlobal()
(#25319)
---
.../apache/pulsar/broker/admin/AdminResource.java | 2 +-
.../broker/admin/impl/PersistentTopicsBase.java | 349 ++++++---------------
.../broker/admin/v2/NonPersistentTopics.java | 14 +-
.../pulsar/broker/service/BrokerService.java | 3 -
.../service/nonpersistent/NonPersistentTopic.java | 58 ++--
.../broker/service/persistent/PersistentTopic.java | 84 +++--
.../pulsar/broker/web/PulsarWebResource.java | 2 +-
.../apache/pulsar/broker/admin/NamespacesTest.java | 3 +-
.../apache/pulsar/common/naming/NamespaceName.java | 4 -
.../org/apache/pulsar/common/naming/TopicName.java | 4 -
.../pulsar/common/naming/NamespaceNameTest.java | 2 -
11 files changed, 169 insertions(+), 356 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index ea8e584c6e3..190ed011143 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -597,7 +597,7 @@ public abstract class AdminResource extends
PulsarWebResource {
.thenCompose(__ ->
provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly, properties))
.thenCompose(__ -> tryCreatePartitionsAsync(numPartitions))
.thenRun(() -> {
- if (!createLocalTopicOnly && topicName.isGlobal()
+ if (!createLocalTopicOnly
&&
pulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
internalCreatePartitionedTopicToReplicatedClustersInBackground(numPartitions);
log.info("[{}] Successfully created partitioned for
topic {} for the remote clusters",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index fed59cf06e2..ad6854e2426 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -328,11 +328,9 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Void>
internalCreateNonPartitionedTopicAsync(boolean authoritative,
Map<String, String>
properties) {
- CompletableFuture<Void> ret =
validateNonPartitionTopicNameAsync(topicName.getLocalName());
- if (topicName.isGlobal()) {
- ret = ret.thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName));
- }
- return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ return validateNonPartitionTopicNameAsync(topicName.getLocalName())
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ ->
validateNamespaceOperationAsync(topicName.getNamespaceObject(),
NamespaceOperation.CREATE_TOPIC))
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
false, false))
@@ -472,7 +470,7 @@ public class PersistentTopicsBase extends AdminResource {
})
);
}).thenCompose(__ -> {
- if (updateLocal || !topicName.isGlobal()) {
+ if (updateLocal) {
return CompletableFuture.completedFuture(null);
}
// update remote cluster
@@ -531,9 +529,9 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalCreateMissedPartitions(AsyncResponse asyncResponse)
{
getPartitionedTopicMetadataAsync(topicName, false,
false).thenAccept(metadata -> {
if (metadata != null && metadata.partitions > 0) {
- CompletableFuture<Void> future =
validateNamespaceOperationAsync(topicName.getNamespaceObject(),
- NamespaceOperation.CREATE_TOPIC);
- future.thenCompose(__ ->
tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> {
+ validateNamespaceOperationAsync(topicName.getNamespaceObject(),
+ NamespaceOperation.CREATE_TOPIC)
+ .thenCompose(__ ->
tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}",
clientAppId(), topicName);
@@ -907,13 +905,9 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean
authoritative) {
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.UNLOAD);
- future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- }
- return CompletableFuture.completedFuture(null);
- }).thenAccept(__ -> {
+ validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
if (isTransactionCoordinatorAssign(topicName)) {
@@ -1182,14 +1176,9 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalGetSubscriptions(AsyncResponse asyncResponse,
boolean authoritative) {
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS);
- future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ validateTopicOperationAsync(topicName,
TopicOperation.GET_SUBSCRIPTIONS)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
@@ -1312,28 +1301,18 @@ public class PersistentTopicsBase extends AdminResource
{
protected CompletableFuture<? extends TopicStats>
internalGetStatsAsync(boolean authoritative,
GetStatsOptions getStatsOptions) {
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
- return future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ return validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> topic.asyncGetStats(getStatsOptions));
}
protected CompletableFuture<PersistentTopicInternalStats>
internalGetInternalStatsAsync(boolean authoritative,
boolean metadata) {
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
- return future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ return validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ -> {
if (metadata) {
return validateTopicOperationAsync(topicName,
TopicOperation.GET_METADATA);
@@ -1345,13 +1324,9 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse,
boolean authoritative) {
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
- future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- }
- return CompletableFuture.completedFuture(null);
- }).thenAccept(__ -> {
+ validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);
@@ -1463,13 +1438,9 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalGetPartitionedStats(AsyncResponse asyncResponse,
boolean authoritative, boolean perPartition,
GetStatsOptions
getStatsOptions) {
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
- future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- }
- return CompletableFuture.completedFuture(null);
- }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
+ validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false)).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
@@ -1553,14 +1524,9 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalGetPartitionedStatsInternal(AsyncResponse
asyncResponse, boolean authoritative) {
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
- future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false))
+ validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false))
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
@@ -1611,14 +1577,9 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Void> internalDeleteSubscriptionAsync(String
subName,
boolean
authoritative,
boolean
force) {
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.UNSUBSCRIBE, subName);
- return future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> {
+ return validateTopicOperationAsync(topicName,
TopicOperation.UNSUBSCRIBE, subName)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> {
if (topicName.isPartitioned()) {
return
internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative,
force);
} else {
@@ -1790,14 +1751,9 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalSkipAllMessages(AsyncResponse asyncResponse, String
subName, boolean authoritative) {
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName);
- future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ -> {
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
@@ -1903,14 +1859,9 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalSkipMessages(AsyncResponse asyncResponse, String
subName, int numMessages,
boolean authoritative) {
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName);
- future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false))
.thenCompose(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
@@ -1967,14 +1918,9 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalExpireMessagesForAllSubscriptions(AsyncResponse
asyncResponse, int expireTimeInSeconds,
boolean authoritative) {
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES);
- future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false))
+ validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false))
.thenAccept(partitionMetadata -> {
if (topicName.isPartitioned()) {
internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
@@ -2102,14 +2048,9 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Void> internalResetCursorAsync(String subName,
long timestamp,
boolean authoritative) {
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName);
- return future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ return validateTopicOperationAsync(topicName,
TopicOperation.RESET_CURSOR, subName)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ -> {
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
@@ -2206,14 +2147,9 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalCreateSubscription(AsyncResponse asyncResponse,
String subscriptionName,
MessageIdImpl messageId, boolean authoritative, boolean
replicated, Map<String, String> properties) {
- CompletableFuture<Void> ret = validateTopicOperationAsync(topicName,
TopicOperation.SUBSCRIBE,
- subscriptionName);
- ret.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- }
- return CompletableFuture.completedFuture(null);
- }).thenAccept(__ -> {
+ validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE,
subscriptionName)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenAccept(__ -> {
final MessageIdImpl targetMessageId = messageId == null ?
(MessageIdImpl) MessageId.latest : messageId;
log.info("[{}][{}] Creating subscription {} at message id {} with
properties {}", clientAppId(),
topicName, subscriptionName, targetMessageId, properties);
@@ -2372,13 +2308,9 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalUpdateSubscriptionProperties(AsyncResponse
asyncResponse, String subName,
Map<String, String>
subscriptionProperties,
boolean authoritative)
{
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE, subName);
- future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- }
- return CompletableFuture.completedFuture(null);
- }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative)).thenAccept(__ -> {
+ validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE,
subName)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative)).thenAccept(__ -> {
if (topicName.isPartitioned()) {
internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse,
subName,
subscriptionProperties, authoritative);
@@ -2450,13 +2382,9 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalAnalyzeSubscriptionBacklog(AsyncResponse
asyncResponse, String subName,
Optional<Position>
position,
boolean authoritative) {
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName);
- future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- }
- return CompletableFuture.completedFuture(null);
- }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
@@ -2488,13 +2416,9 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalGetSubscriptionProperties(AsyncResponse
asyncResponse, String subName,
boolean authoritative)
{
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName);
- future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- }
- return CompletableFuture.completedFuture(null);
- }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative)).thenAccept(__ -> {
+ validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative)).thenAccept(__ -> {
if (topicName.isPartitioned()) {
internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
authoritative);
@@ -2572,8 +2496,8 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalResetCursorOnPosition(AsyncResponse asyncResponse,
String subName, boolean authoritative,
MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
- CompletableFuture<Void> ret = validateTopicOperationAsync(topicName,
TopicOperation.RESET_CURSOR, subName);
- ret.thenCompose(__ -> {
+ validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR,
subName)
+ .thenCompose(__ -> {
// If the topic name is a partition name, no need to get partition
topic metadata again
if (!topicName.isPartitioned()) {
return getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
@@ -2589,13 +2513,8 @@ public class PersistentTopicsBase extends AdminResource {
} else {
return CompletableFuture.completedFuture(null);
}
- }).thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> {
+ }).thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> {
log.info("[{}][{}] received reset cursor on subscription {} to
position {}", clientAppId(), topicName,
subName, messageId);
return validateTopicOwnershipAsync(topicName, authoritative);
@@ -2746,14 +2665,9 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Response> internalGetMessageById(long
ledgerId, long entryId, boolean authoritative) {
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
- return future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> {
+ return validateTopicOperationAsync(topicName,
TopicOperation.PEEK_MESSAGES)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> {
if (topicName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
} else {
@@ -2809,14 +2723,9 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<MessageId>
internalGetMessageIdByTimestampAsync(long timestamp, boolean authoritative) {
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
- return future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> {
+ return validateTopicOperationAsync(topicName,
TopicOperation.PEEK_MESSAGES)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> {
if (topicName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
} else {
@@ -2880,8 +2789,8 @@ public class PersistentTopicsBase extends AdminResource {
protected CompletableFuture<Response> internalPeekNthMessageAsync(String
subName, int messagePosition,
boolean
authoritative) {
- CompletableFuture<Void> ret = validateTopicOperationAsync(topicName,
TopicOperation.PEEK_MESSAGES, subName);
- return ret.thenCompose(__ -> {
+ return validateTopicOperationAsync(topicName,
TopicOperation.PEEK_MESSAGES, subName)
+ .thenCompose(__ -> {
// If the topic name is a partition name, no need to get partition
topic metadata again
if (!topicName.isPartitioned()) {
return getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
@@ -2939,14 +2848,9 @@ public class PersistentTopicsBase extends AdminResource {
boolean
authoritative) {
long messagePositionLocal = messagePosition < 1 ? 1 : messagePosition;
String initialPositionLocal = initialPosition == null ? "latest" :
initialPosition;
- CompletableFuture<Void> ret = validateTopicOperationAsync(topicName,
TopicOperation.PEEK_MESSAGES);
- return ret.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ return validateTopicOperationAsync(topicName,
TopicOperation.PEEK_MESSAGES)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ -> {
if (!topicName.isPartitioned()) {
return getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
@@ -3174,15 +3078,10 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected CompletableFuture<PersistentOfflineTopicStats>
internalGetBacklogAsync(boolean authoritative) {
- CompletableFuture<Void> ret;
- if (topicName.isGlobal()) {
- ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- ret = CompletableFuture.completedFuture(null);
- }
// Validate that namespace exists, throw 404 if it doesn't exist
// note that we do not want to load the topic and hence skip
authorization check
- return ret.thenCompose(__ ->
namespaceResources().getPoliciesAsync(namespaceName))
+ return validateGlobalNamespaceOwnershipAsync(namespaceName)
+ .thenCompose(__ ->
namespaceResources().getPoliciesAsync(namespaceName))
.thenCompose(__ -> {
PersistentOfflineTopicStats offlineTopicStats =
pulsar().getBrokerService().getOfflineTopicStat(topicName);
@@ -3245,8 +3144,8 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalGetBacklogSizeByMessageId(AsyncResponse
asyncResponse,
MessageIdImpl messageId,
boolean authoritative) {
- CompletableFuture<Void> ret = validateTopicOperationAsync(topicName,
TopicOperation.GET_BACKLOG_SIZE);
- ret.thenCompose(__ -> {
+ validateTopicOperationAsync(topicName, TopicOperation.GET_BACKLOG_SIZE)
+ .thenCompose(__ -> {
// If the topic name is a partition name, no need to get partition
topic metadata again
if (!topicName.isPartitioned()) {
return getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
@@ -3262,13 +3161,8 @@ public class PersistentTopicsBase extends AdminResource {
} else {
return CompletableFuture.completedFuture(null);
}
- }).thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ }).thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(unused -> getTopicReferenceAsync(topicName))
.thenAccept(t -> {
PersistentTopic topic = (PersistentTopic) t;
@@ -3725,13 +3619,7 @@ public class PersistentTopicsBase extends AdminResource {
return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
"Not allowed to set/get topic policy for a partition"));
}
- CompletableFuture<Void> ret;
- if (topicName.isGlobal()) {
- ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- ret = CompletableFuture.completedFuture(null);
- }
- return ret
+ return validateGlobalNamespaceOwnershipAsync(namespaceName)
.thenCompose(__ -> checkTopicExistsAsync(topicName))
.thenCompose(topicExistsInfo -> {
if (!topicExistsInfo.isExists()) {
@@ -3792,14 +3680,9 @@ public class PersistentTopicsBase extends AdminResource {
"Termination of a system topic is not allowed"));
}
- CompletableFuture<Void> ret = validateTopicOperationAsync(topicName,
TopicOperation.TERMINATE);
- return ret.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ return validateTopicOperationAsync(topicName, TopicOperation.TERMINATE)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false))
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
@@ -3825,14 +3708,9 @@ public class PersistentTopicsBase extends AdminResource {
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
msg));
return;
}
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.TERMINATE);
- future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(unused -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false))
+ validateTopicOperationAsync(topicName, TopicOperation.TERMINATE)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(unused -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false))
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
String msg = "Termination of a non-partitioned topic is not
allowed using partitioned-terminate"
@@ -3893,15 +3771,9 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalExpireMessagesByTimestamp(AsyncResponse
asyncResponse, String subName,
int expireTimeInSeconds,
boolean authoritative) {
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES,
- subName);
- future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES,
subName)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ ->
// If the topic name is a partition name, no need to get partition
topic metadata again
getPartitionedTopicMetadataAsync(topicName, authoritative, false)
@@ -4056,8 +3928,8 @@ public class PersistentTopicsBase extends AdminResource {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
msg));
return;
}
- CompletableFuture<Void> ret = validateTopicOperationAsync(topicName,
TopicOperation.EXPIRE_MESSAGES, subName);
- ret.thenCompose(__ -> {
+ validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES,
subName)
+ .thenCompose(__ -> {
// If the topic name is a partition name, no need to get partition
topic metadata again
if (!topicName.isPartitioned()) {
return getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
@@ -4072,13 +3944,8 @@ public class PersistentTopicsBase extends AdminResource {
} else {
return CompletableFuture.completedFuture(null);
}
- }).thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
+ }).thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName,
authoritative))
.thenCompose(__ -> {
log.info("[{}][{}] Received expire messages on subscription {} to
position {}", clientAppId(),
topicName, subName, messageId);
@@ -4180,14 +4047,9 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalTriggerCompaction(AsyncResponse asyncResponse,
boolean authoritative) {
log.info("[{}] Trigger compaction on topic {}", clientAppId(),
topicName);
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.COMPACT);
- future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenAccept(__ -> {
+ validateTopicOperationAsync(topicName, TopicOperation.COMPACT)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
internalTriggerCompactionNonPartitionedTopic(asyncResponse,
authoritative);
@@ -4957,13 +4819,6 @@ public class PersistentTopicsBase extends AdminResource {
return;
}
- // Reject the request if the topic is not global
- if (!topicName.isGlobal()) {
- asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
- "Cannot enable/disable replicated subscriptions on
non-global topics"));
- return;
- }
-
// 1.Permission to consume this topic is required
// 2.Redirect the request to the peer-cluster if the local cluster is
not included in the replication clusters
CompletableFuture<Void> validateFuture =
@@ -5097,13 +4952,6 @@ public class PersistentTopicsBase extends AdminResource {
return;
}
- // Reject the request if the topic is not global
- if (!topicName.isGlobal()) {
- asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
- "Cannot get replicated subscriptions on non-global
topics"));
- return;
- }
-
// Permission to consume this topic is required
CompletableFuture<Void> validateFuture =
validateTopicOperationAsync(topicName,
TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS, subName);
@@ -5392,14 +5240,9 @@ public class PersistentTopicsBase extends AdminResource {
"Invalid message index: " + index));
}
int partitionIndex = topicName.getPartitionIndex();
- CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
- return future.thenCompose(__ -> {
- if (topicName.isGlobal()) {
- return
validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }).thenCompose(__ -> {
+ return validateTopicOperationAsync(topicName,
TopicOperation.PEEK_MESSAGES)
+ .thenCompose(__ ->
validateGlobalNamespaceOwnershipAsync(namespaceName))
+ .thenCompose(__ -> {
if (topicName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 9071884185d..16cbc7104d4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -239,14 +239,12 @@ public class NonPersistentTopics extends PersistentTopics
{
throw new RestException(Response.Status.PRECONDITION_FAILED,
"Partitioned Topic Name should not contain
'-partition-'");
}
- if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.error("[{}] Failed to get partitioned stats for {}",
clientAppId(), topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
+ try {
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ log.error("[{}] Failed to get partitioned stats for {}",
clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
}
getPartitionedTopicMetadataAsync(topicName,
authoritative, false).thenAccept(partitionMetadata -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 513b16a5255..79a26ec56ae 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2791,9 +2791,6 @@ public class BrokerService implements Closeable {
* @param namespace
*/
private void unloadDeletedReplNamespace(Policies data, NamespaceName
namespace) {
- if (!namespace.isGlobal()) {
- return;
- }
final String localCluster =
this.pulsar.getConfiguration().getClusterName();
if (pulsar.getBrokerService().isCurrentClusterAllowed(namespace,
data)) {
return;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 2b14df499ae..45a92c92f7e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -583,7 +583,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
@Override
public CompletableFuture<Void> checkReplication() {
TopicName name = TopicName.get(topic);
- if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
+ if (NamespaceService.isHeartbeatNamespace(name)
|| ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
return CompletableFuture.completedFuture(null);
}
@@ -990,11 +990,8 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
}
public boolean isActive() {
- if (TopicName.get(topic).isGlobal()) {
- // No local consumers and no local producers
- return !subscriptions.isEmpty() || hasLocalProducers();
- }
- return currentUsageCount() != 0 || !subscriptions.isEmpty();
+ // No local consumers and no local producers
+ return !subscriptions.isEmpty() || hasLocalProducers();
}
@Override
@@ -1050,34 +1047,31 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
} else {
if (System.nanoTime() - lastActive >
TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {
- if (TopicName.get(topic).isGlobal()) {
- // For global namespace, close repl producers first.
- // Once all repl producers are closed, we can delete the
topic,
- // provided no remote producers connected to the broker.
- if (log.isDebugEnabled()) {
- log.debug("[{}] Global topic inactive for {} seconds,
closing repl producers.", topic,
- maxInactiveDurationInSec);
- }
+ // Close repl producers first.
+ // Once all repl producers are closed, we can delete the topic,
+ // provided no remote producers connected to the broker.
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Topic inactive for {} seconds, closing
repl producers.", topic,
+ maxInactiveDurationInSec);
+ }
- stopReplProducers().thenCompose(v -> delete(true, false))
- .thenCompose(__ ->
tryToDeletePartitionedMetadata())
- .thenRun(() -> log.info("[{}] Topic deleted
successfully due to inactivity", topic))
- .exceptionally(e -> {
- Throwable throwable = e.getCause();
- if (throwable instanceof TopicBusyException) {
- // topic became active again
- if (log.isDebugEnabled()) {
- log.debug("[{}] Did not delete busy
topic: {}", topic,
- throwable.getMessage());
- }
- replicators.forEach((region, replicator)
-> replicator.startProducer());
- } else {
- log.warn("[{}] Inactive topic deletion
failed", topic, e);
+ stopReplProducers().thenCompose(v -> delete(true, false))
+ .thenCompose(__ -> tryToDeletePartitionedMetadata())
+ .thenRun(() -> log.info("[{}] Topic deleted
successfully due to inactivity", topic))
+ .exceptionally(e -> {
+ Throwable throwable = e.getCause();
+ if (throwable instanceof TopicBusyException) {
+ // topic became active again
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Did not delete busy topic:
{}", topic,
+ throwable.getMessage());
}
- return null;
- });
-
- }
+ replicators.forEach((region, replicator) ->
replicator.startProducer());
+ } else {
+ log.warn("[{}] Inactive topic deletion
failed", topic, e);
+ }
+ return null;
+ });
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index da4d717e798..dfbb2725db4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1942,7 +1942,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Override
public CompletableFuture<Void> checkReplication() {
TopicName name = TopicName.get(topic);
- if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
+ if (NamespaceService.isHeartbeatNamespace(name)
|| ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
return CompletableFuture.completedFuture(null);
}
@@ -2112,7 +2112,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
if (nsPolicies.isPresent()) {
allowedClusters = nsPolicies.get().allowed_clusters;
}
- if (TopicName.get(topic).isGlobal() &&
!topicRepls.contains(localCluster)
+ if (!topicRepls.contains(localCluster)
&& !allowedClusters.contains(localCluster)) {
log.warn("Local cluster {} is not part of global
namespace repl list {} and allowed list {}",
localCluster, topicRepls, allowedClusters);
@@ -3197,12 +3197,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
break;
}
- if (TopicName.get(topic).isGlobal()) {
- // no local producers
- return hasLocalProducers();
- } else {
- return currentUsageCount() != 0;
- }
+ // no local producers
+ return hasLocalProducers();
}
private boolean hasBacklogs(boolean getPreciseBacklog) {
@@ -3410,46 +3406,42 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
} else {
CompletableFuture<Void> replCloseFuture = new
CompletableFuture<>();
- if (TopicName.get(topic).isGlobal()) {
- // For global namespace, close repl producers first.
- // Once all repl producers are closed, we can delete the topic,
- // provided no remote producers connected to the broker.
- if (log.isDebugEnabled()) {
- log.debug("[{}] Global topic inactive for {} seconds,
closing repl producers.", topic,
- maxInactiveDurationInSec);
- }
- /**
- * There is a race condition that may cause a NPE:
- * - task 1: a callback of "replicator.cursor.asyncRead" will
trigger a replication.
- * - task 2: "closeReplProducersIfNoBacklog" called by current
thread will make the variable
- * "replicator.producer" to a null value.
- * Race condition: task 1 will get a NPE when it tries to send
messages using the variable
- * "replicator.producer", because task 2 will set this
variable to "null".
- * TODO Create a seperated PR to fix it.
- */
- closeReplProducersIfNoBacklog().thenRun(() -> {
- if (hasRemoteProducers()) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Global topic has connected remote
producers. Not a candidate for GC",
- topic);
- }
- replCloseFuture
- .completeExceptionally(new
TopicBusyException("Topic has connected remote producers"));
- } else {
- log.info("[{}] Global topic inactive for {} seconds,
closed repl producers", topic,
- maxInactiveDurationInSec);
- replCloseFuture.complete(null);
- }
- }).exceptionally(e -> {
+ // Close repl producers first.
+ // Once all repl producers are closed, we can delete the topic,
+ // provided no remote producers connected to the broker.
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Topic inactive for {} seconds, closing repl
producers.", topic,
+ maxInactiveDurationInSec);
+ }
+ /**
+ * There is a race condition that may cause a NPE:
+ * - task 1: a callback of "replicator.cursor.asyncRead" will
trigger a replication.
+ * - task 2: "closeReplProducersIfNoBacklog" called by current
thread will make the variable
+ * "replicator.producer" to a null value.
+ * Race condition: task 1 will get a NPE when it tries to send
messages using the variable
+ * "replicator.producer", because task 2 will set this variable to
"null".
+ * TODO Create a seperated PR to fix it.
+ */
+ closeReplProducersIfNoBacklog().thenRun(() -> {
+ if (hasRemoteProducers()) {
if (log.isDebugEnabled()) {
- log.debug("[{}] Global topic has replication backlog.
Not a candidate for GC", topic);
+ log.debug("[{}] Topic has connected remote producers.
Not a candidate for GC",
+ topic);
}
- replCloseFuture.completeExceptionally(e.getCause());
- return null;
- });
- } else {
- replCloseFuture.complete(null);
- }
+ replCloseFuture
+ .completeExceptionally(new
TopicBusyException("Topic has connected remote producers"));
+ } else {
+ log.info("[{}] Topic inactive for {} seconds, closed repl
producers", topic,
+ maxInactiveDurationInSec);
+ replCloseFuture.complete(null);
+ }
+ }).exceptionally(e -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Topic has replication backlog. Not a
candidate for GC", topic);
+ }
+ replCloseFuture.completeExceptionally(e.getCause());
+ return null;
+ });
replCloseFuture.thenCompose(v -> delete(deleteMode ==
InactiveTopicDeleteMode.delete_when_no_subscriptions,
deleteMode ==
InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, false))
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 407458a1090..3fe51df90f7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -872,7 +872,7 @@ public abstract class PulsarWebResource {
public static CompletableFuture<ClusterDataImpl>
checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService,
NamespaceName namespace,
boolean allowDeletedNamespace) {
- if (!namespace.isGlobal() ||
NamespaceService.isSLAOrHeartbeatNamespace(namespace.toString())) {
+ if (NamespaceService.isSLAOrHeartbeatNamespace(namespace.toString())) {
return CompletableFuture.completedFuture(null);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index ca3f039fea4..c40e8a84c5e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -223,8 +223,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
admin.clusters().createCluster("use",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
admin.clusters().createCluster("usw",
ClusterData.builder().serviceUrl("http://127.0.0.2:8082").build());
admin.clusters().createCluster("usc",
ClusterData.builder().serviceUrl("http://127.0.0.3:8083").build());
- // After V1 removal, all namespaces go through the peer-cluster
redirect path
- // (NamespaceName.isGlobal() always returns true), so peer clusters
must be configured.
+ // All namespaces go through the peer-cluster redirect path, so peer
clusters must be configured.
// Only "usc" is a peer because peer clusters cannot also be
replication clusters.
admin.clusters().updatePeerClusterNames("use",
new LinkedHashSet<>(List.of("usc")));
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java
index a2e13125a96..bda21502dfb 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java
@@ -119,10 +119,6 @@ public class NamespaceName implements ServiceUnitId {
return localName;
}
- public boolean isGlobal() {
- return true;
- }
-
public String getPersistentTopicName(String localTopic) {
return getTopicName(TopicDomain.persistent, localTopic);
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index ad14edabde0..a1c7055a897 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -352,10 +352,6 @@ public class TopicName implements ServiceUnitId {
return String.format("%s/%s/%s/%s", domain, tenant, namespacePortion,
getEncodedLocalName());
}
- public boolean isGlobal() {
- return namespaceName.isGlobal();
- }
-
public String getSchemaName() {
return getTenant()
+ "/" + getNamespacePortion()
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/NamespaceNameTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/NamespaceNameTest.java
index 5d2579ce4f0..24b86daa562 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/NamespaceNameTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/NamespaceNameTest.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.common.naming;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
import org.testng.annotations.Test;
public class NamespaceNameTest {
@@ -114,7 +113,6 @@ public class NamespaceNameTest {
NamespaceName ns = NamespaceName.get("my-tenant/my-namespace");
assertEquals(ns.getTenant(), "my-tenant");
assertEquals(ns.getLocalName(), "my-namespace");
- assertTrue(ns.isGlobal());
assertEquals(ns.getPersistentTopicName("my-topic"),
"persistent://my-tenant/my-namespace/my-topic");
}
}