This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new a08c8ee5ccb [fix][broker] Allow Access to System Topic Metadata for
Reader Creation Post-Namespace Deletion (#20304)
a08c8ee5ccb is described below
commit a08c8ee5ccbba279224e43c7c7afa8c4a00031bb
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri May 12 19:29:42 2023 +0800
[fix][broker] Allow Access to System Topic Metadata for Reader Creation
Post-Namespace Deletion (#20304)
## Motivation
After initiating the snapshot segment function, deletion of topics
necessitates the activation of readers. Furthermore, these readers should be
opened and deleted as they are used, which implies that we should not pre-store
readers. However, after initiating the deletion of namespaces currently, it is
not allowed to obtain the metadata of partition topics or lookup, making it
impossible to create readers. This results in the inability to delete
namespaces.
## Modification
Allow the acquisition of system topic metadata after initiating namespace
deletion, thus creating readers to clean up topic data.
(cherry picked from commit 5d5ec947249df50bf35a78b6a2a0d3b00d97ca66)
---
.../broker/admin/impl/PersistentTopicsBase.java | 14 ++++++-
.../pulsar/broker/lookup/TopicLookupBase.java | 44 ++++++++++++----------
.../pulsar/broker/transaction/TransactionTest.java | 21 +++++++++++
3 files changed, 58 insertions(+), 21 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index fcade8270cb..de75c5f2132 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;
+import static org.apache.pulsar.common.naming.SystemTopicNames.isSystemTopic;
import static
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign;
import static
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
import static
org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
@@ -4408,8 +4409,13 @@ public class PersistentTopicsBase extends AdminResource {
// validates global-namespace contains local/peer cluster: if
peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so,
client fails while creating
// producer/consumer
+ // It is necessary for system topic operations because system topics
are used to store metadata
+ // and other vital information. Even after namespace starting
deletion,,
+ // we need to access the metadata of system topics to create readers
and clean up topic data.
+ // If we don't do this, it can prevent namespace deletion due to
inaccessible readers.
authorizationFuture.thenCompose(__ ->
- checkLocalOrGetPeerReplicationCluster(pulsar,
topicName.getNamespaceObject()))
+ checkLocalOrGetPeerReplicationCluster(pulsar,
topicName.getNamespaceObject(),
+ SystemTopicNames.isSystemTopic(topicName)))
.thenCompose(res ->
pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
@@ -4436,7 +4442,11 @@ public class PersistentTopicsBase extends AdminResource {
// validates global-namespace contains local/peer cluster: if
peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so,
client fails while creating
// producer/consumer
- checkLocalOrGetPeerReplicationCluster(pulsar,
topicName.getNamespaceObject())
+ // It is necessary for system topic operations because system topics
are used to store metadata
+ // and other vital information. Even after namespace starting
deletion,,
+ // we need to access the metadata of system topics to create readers
and clean up topic data.
+ // If we don't do this, it can prevent namespace deletion due to
inaccessible readers.
+ checkLocalOrGetPeerReplicationCluster(pulsar,
topicName.getNamespaceObject(), isSystemTopic(topicName))
.thenCompose(res -> pulsar.getBrokerService()
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 3b64d2a9f83..bd70201cba5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -41,6 +41,7 @@ import
org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
@@ -221,26 +222,31 @@ public class TopicLookupBase extends PulsarWebResource {
// (2) authorize client
checkAuthorizationAsync(pulsarService, topicName, clientAppId,
authenticationData).thenRun(() -> {
// (3) validate global namespace
+ // It is necessary for system topic operations because
system topics are used to store metadata
+ // and other vital information. Even after namespace
starting deletion,
+ // we need to access the metadata of system topics to
create readers and clean up topic data.
+ // If we don't do this, it can prevent namespace
deletion due to inaccessible readers.
checkLocalOrGetPeerReplicationCluster(pulsarService,
-
topicName.getNamespaceObject()).thenAccept(peerClusterData -> {
- if (peerClusterData == null) {
- // (4) all validation passed: initiate lookup
- validationFuture.complete(null);
- return;
- }
- // if peer-cluster-data is present it means
namespace is owned by that peer-cluster and
- // request should be redirect to the peer-cluster
- if
(StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
- &&
StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
-
validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
- "Redirected cluster's brokerService
url is not configured",
- requestId));
- return;
- }
-
validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
- peerClusterData.getBrokerServiceUrlTls(),
true, LookupType.Redirect,
- requestId,
- false));
+ topicName.getNamespaceObject(),
SystemTopicNames.isSystemTopic(topicName))
+ .thenAccept(peerClusterData -> {
+ if (peerClusterData == null) {
+ // (4) all validation passed: initiate
lookup
+ validationFuture.complete(null);
+ return;
+ }
+ // if peer-cluster-data is present it
means namespace is owned by that peer-cluster
+ // and request should be redirect to the
peer-cluster
+ if
(StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
+ &&
StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
+
validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
+ "Redirected cluster's
brokerService url is not configured",
+ requestId));
+ return;
+ }
+
validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
+
peerClusterData.getBrokerServiceUrlTls(), true,
+ LookupType.Redirect, requestId,
+ false));
}).exceptionally(ex -> {
Throwable throwable =
FutureUtil.unwrapCompletionException(ex);
if (throwable instanceof RestException
restException){
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index c3533e70cf8..c4ec2ec766e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -272,6 +272,27 @@ public class TransactionTest extends TransactionTestBase {
}
}
+ @Test
+ public void testCanDeleteNamespaceWhenEnableTxnSegmentedSnapshot() throws
Exception {
+ // Enable the segmented snapshot feature
+
pulsarServiceList.get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+
pulsarServiceList.get(0).getConfig().setForceDeleteNamespaceAllowed(true);
+
+ // Create a new namespace
+ String namespaceName = TENANT +
"/testSegmentedSnapshotWithoutCreatingOldSnapshotTopic";
+ admin.namespaces().createNamespace(namespaceName);
+
+ // Create a new topic in the namespace
+ String topicName = "persistent://" + namespaceName + "/newTopic";
+ @Cleanup
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ producer.close();
+
+ // Destroy the namespace after the test
+ admin.namespaces().deleteNamespace(namespaceName, true);
+
pulsarServiceList.get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
+ }
+
@Test
public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
String subName = "test";