This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 06fdbe69d91 [cherry-pick][branch-2.9] Cherry-pick #18307 (Fix can not
delete namespace by force) (#18803)
06fdbe69d91 is described below
commit 06fdbe69d9149bc85cc648cbf3293878f68022e7
Author: Xiangying Meng <[email protected]>
AuthorDate: Thu Dec 8 17:22:31 2022 +0800
[cherry-pick][branch-2.9] Cherry-pick #18307 (Fix can not delete namespace
by force) (#18803)
### Motivation
Cherry-pick https://github.com/apache/pulsar/pull/18307 for release 2.9.4.
### Modifications
Cherry-pick https://github.com/apache/pulsar/pull/18307 for release 2.9.4.
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 83 ++++++++++++++++++----
.../broker/service/persistent/PersistentTopic.java | 54 ++++++++------
.../apache/pulsar/broker/admin/NamespacesTest.java | 65 +++++++++++++++++
.../broker/transaction/TransactionProduceTest.java | 29 --------
.../pulsar/broker/transaction/TransactionTest.java | 8 +--
5 files changed, 167 insertions(+), 72 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0b535b67960..7e2fa1ada7b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets.SetView;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -48,6 +49,7 @@ import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.lang3.StringUtils;
@@ -60,6 +62,7 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
@@ -472,11 +475,17 @@ public abstract class NamespacesBase extends
AdminResource {
if (!topics.isEmpty()) {
Set<String> partitionedTopics = new HashSet<>();
Set<String> nonPartitionedTopics = new HashSet<>();
+ Set<String> allSystemTopics = new HashSet<>();
+ Set<String> allPartitionedSystemTopics = new HashSet<>();
for (String topic : topics) {
try {
TopicName topicName = TopicName.get(topic);
if (topicName.isPartitioned()) {
+ if
(pulsar().getBrokerService().isSystemTopic(topicName)) {
+ allPartitionedSystemTopics.add(topic);
+ continue;
+ }
String partitionedTopic =
topicName.getPartitionedTopicName();
if (!partitionedTopics.contains(partitionedTopic))
{
// Distinguish partitioned topic to avoid
duplicate deletion of the same schema
@@ -485,6 +494,10 @@ public abstract class NamespacesBase extends AdminResource
{
partitionedTopics.add(partitionedTopic);
}
} else {
+ if
(pulsar().getBrokerService().isSystemTopic(topicName)) {
+ allSystemTopics.add(topic);
+ continue;
+ }
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
topic, true, true));
nonPartitionedTopics.add(topic);
@@ -508,21 +521,24 @@ public abstract class NamespacesBase extends
AdminResource {
}
final CompletableFuture<Throwable> topicFutureEx =
- FutureUtil.waitForAll(topicFutures).handle((result,
exception) -> {
- if (exception != null) {
- if (exception.getCause() instanceof
PulsarAdminException) {
- asyncResponse
- .resume(new
RestException((PulsarAdminException) exception.getCause()));
- } else {
- log.error("[{}] Failed to remove
forcefully owned namespace {}",
- clientAppId(), namespaceName,
exception);
- asyncResponse.resume(new
RestException(exception.getCause()));
- }
- return exception;
- }
-
- return null;
- });
+ FutureUtil.waitForAll(topicFutures)
+ .thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+ .thenCompose(ignore ->
internalDeleteTopicsAsync(allSystemTopics))
+ .handle((result, exception) -> {
+ if (exception != null) {
+ if (exception.getCause() instanceof
PulsarAdminException) {
+ asyncResponse.resume(
+ new
RestException((PulsarAdminException) exception.getCause()));
+ } else {
+ log.error("[{}] Failed to remove
forcefully owned namespace {}",
+ clientAppId(),
namespaceName, exception);
+ asyncResponse.resume(new
RestException(exception.getCause()));
+ }
+ return exception;
+ }
+
+ return null;
+ });
if (topicFutureEx.join() != null) {
return;
}
@@ -566,6 +582,43 @@ public abstract class NamespacesBase extends AdminResource
{
});
}
+ private CompletableFuture<Void>
internalDeletePartitionedTopicsAsync(Set<String> topicNames) {
+ if (CollectionUtils.isEmpty(topicNames)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ PulsarAdmin admin;
+ try {
+ admin = pulsar().getAdminClient();
+ } catch (Exception ex) {
+ log.error("[{}] Get admin client error when preparing to delete
topics.", clientAppId(), ex);
+ return FutureUtil.failedFuture(ex);
+ }
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (String topicName : topicNames) {
+ TopicName tn = TopicName.get(topicName);
+ String partitionedTopic = tn.getPartitionedTopicName();
+
futures.add(admin.topics().deletePartitionedTopicAsync(partitionedTopic, true,
true));
+ }
+ return FutureUtil.waitForAll(futures);
+ }
+ private CompletableFuture<Void> internalDeleteTopicsAsync(Set<String>
topicNames) {
+ if (CollectionUtils.isEmpty(topicNames)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ PulsarAdmin admin;
+ try {
+ admin = pulsar().getAdminClient();
+ } catch (Exception ex) {
+ log.error("[{}] Get admin client error when preparing to delete
topics.", clientAppId(), ex);
+ return FutureUtil.failedFuture(ex);
+ }
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (String topicName : topicNames) {
+ futures.add(admin.topics().deleteAsync(topicName, true));
+ }
+ return FutureUtil.waitForAll(futures);
+ }
+
protected void internalDeleteNamespaceBundle(String bundleRange, boolean
authoritative, boolean force) {
if (force) {
internalDeleteNamespaceBundleForcefully(bundleRange,
authoritative);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f630ed32d47..bcf81fb4625 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1176,31 +1176,39 @@ public class PersistentTopic extends AbstractTopic
return null;
});
- closeClientFuture.thenAccept(delete -> {
- CompletableFuture<Void> deleteTopicAuthenticationFuture = new
CompletableFuture<>();
- brokerService.deleteTopicAuthenticationWithRetry(topic,
deleteTopicAuthenticationFuture, 5);
- deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema
? deleteSchema() :
- CompletableFuture.completedFuture(null))
- .thenCompose(__ -> deleteTopicPolicies())
- .thenCompose(__ -> transactionBufferCleanupAndClose())
- .whenComplete((v, ex) -> {
- if (ex != null) {
- log.error("[{}] Error deleting topic", topic,
ex);
- unfenceTopicToResume();
- deleteFuture.completeExceptionally(ex);
- } else {
- List<CompletableFuture<Void>>
subsDeleteFutures = new ArrayList<>();
- subscriptions.forEach((sub, p) ->
subsDeleteFutures.add(unsubscribe(sub)));
-
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
- if (e != null) {
- log.error("[{}] Error deleting topic",
topic, e);
+ closeClientFuture.thenAccept(__ -> {
+ CompletableFuture<Void> deleteTopicAuthenticationFuture =
new CompletableFuture<>();
+ brokerService.deleteTopicAuthenticationWithRetry(topic,
deleteTopicAuthenticationFuture, 5);
+ deleteTopicAuthenticationFuture.thenCompose(ignore ->
deleteSchema ? deleteSchema() :
+
CompletableFuture.completedFuture(null))
+ .thenCompose(ignore -> {
+ if
(!this.getBrokerService().getPulsar().getBrokerService()
+
.isSystemTopic(TopicName.get(topic))) {
+ return deleteTopicPolicies();
+ } else {
+ return
CompletableFuture.completedFuture(null);
+ }
+ })
+ .thenCompose(ignore ->
transactionBufferCleanupAndClose())
+ .whenComplete((v, ex) -> {
+ if (ex != null) {
+ log.error("[{}] Error deleting topic",
topic, ex);
unfenceTopicToResume();
- deleteFuture.completeExceptionally(e);
+ deleteFuture.completeExceptionally(ex);
} else {
- ledger.asyncDelete(new
AsyncCallbacks.DeleteLedgerCallback() {
- @Override
- public void
deleteLedgerComplete(Object ctx) {
-
brokerService.removeTopicFromCache(PersistentTopic.this);
+ List<CompletableFuture<Void>>
subsDeleteFutures = new ArrayList<>();
+ subscriptions.forEach((sub, p) ->
subsDeleteFutures.add(unsubscribe(sub)));
+
+
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
+ if (e != null) {
+ log.error("[{}] Error deleting
topic", topic, e);
+ unfenceTopicToResume();
+
deleteFuture.completeExceptionally(e);
+ } else {
+ ledger.asyncDelete(new
AsyncCallbacks.DeleteLedgerCallback() {
+ @Override
+ public void
deleteLedgerComplete(Object ctx) {
+
brokerService.removeTopicFromCache(PersistentTopic.this);
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index c4bef2776a2..b5b57769e3b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -62,6 +62,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.v1.Namespaces;
import org.apache.pulsar.broker.admin.v1.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -69,6 +70,7 @@ import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnershipCache;
+import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -79,9 +81,11 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BundlesData;
@@ -105,6 +109,7 @@ import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -1771,4 +1776,64 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
BundlesData bundles = admin.namespaces().getBundles(namespace);
assertEquals(bundles.getNumBundles(), 14);
}
+ @Test
+ public void testFinallyDeleteSystemTopicWhenDeleteNamespace() throws
Exception {
+ String namespace = this.testTenant + "/delete-namespace";
+ String topic = TopicName.get(TopicDomain.persistent.toString(),
this.testTenant, "delete-namespace",
+ "testFinallyDeleteSystemTopicWhenDeleteNamespace").toString();
+
+ // 0. enable topic level polices and system topic
+ pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+ pulsar.getConfig().setSystemTopicEnabled(true);
+ pulsar.getConfig().setForceDeleteNamespaceAllowed(true);
+ Field policesService =
PulsarService.class.getDeclaredField("topicPoliciesService");
+ policesService.setAccessible(true);
+ policesService.set(pulsar, new
SystemTopicBasedTopicPoliciesService(pulsar));
+
+ // 1. create a test namespace.
+ admin.namespaces().createNamespace(namespace);
+ // 2. create a test topic.
+ admin.topics().createNonPartitionedTopic(topic);
+ // 3. change policy of the topic.
+ admin.topicPolicies().setMaxConsumers(topic, 5);
+ // 4. change the order of the topics in this namespace.
+ List<String> topics =
pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(namespace)).get();
+ Assert.assertTrue(topics.size() >= 2);
+ for (int i = 0; i < topics.size(); i++) {
+ if
(topics.get(i).contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)) {
+ String systemTopic = topics.get(i);
+ topics.set(i, topics.get(0));
+ topics.set(0, systemTopic);
+ }
+ }
+ NamespaceService mockNamespaceService =
spy(pulsar.getNamespaceService());
+ Field namespaceServiceField =
PulsarService.class.getDeclaredField("nsService");
+ namespaceServiceField.setAccessible(true);
+ namespaceServiceField.set(pulsar, mockNamespaceService);
+
doReturn(CompletableFuture.completedFuture(topics)).when(mockNamespaceService).getFullListOfTopics(any());
+ // 5. delete the namespace
+ admin.namespaces().deleteNamespace(namespace, true);
+ }
+
+ @Test
+ public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws
Exception {
+ String namespace = this.testTenant + "/delete-systemTopic";
+ String topic = TopicName.get(TopicDomain.persistent.toString(),
this.testTenant, "delete-systemTopic",
+ "testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
+
+ // 0. enable topic level polices and system topic
+ pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+ pulsar.getConfig().setSystemTopicEnabled(true);
+ Field policesService =
PulsarService.class.getDeclaredField("topicPoliciesService");
+ policesService.setAccessible(true);
+ policesService.set(pulsar, new
SystemTopicBasedTopicPoliciesService(pulsar));
+ // 1. create a test namespace.
+ admin.namespaces().createNamespace(namespace);
+ // 2. create a test topic.
+ admin.topics().createNonPartitionedTopic(topic);
+ // 3. change policy of the topic.
+ admin.topicPolicies().setMaxConsumers(topic, 5);
+ // 4. delete the policies topic and the topic wil not to clear topic
polices
+ admin.topics().delete(namespace + "/" +
EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 316c2f9f9df..cbae03b1a8b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -89,35 +89,6 @@ public class TransactionProduceTest extends
TransactionTestBase {
produceTest(true);
}
- @Test
- public void testDeleteNamespaceBeforeCommit() throws Exception {
- final String topic = "persistent://" + NAMESPACE3 +
"/testDeleteTopicBeforeCommit";
- PulsarClient pulsarClient = this.pulsarClient;
- Transaction tnx = pulsarClient.newTransaction()
- .withTransactionTimeout(60, TimeUnit.SECONDS)
- .build().get();
- long txnIdMostBits = ((TransactionImpl) tnx).getTxnIdMostBits();
- long txnIdLeastBits = ((TransactionImpl) tnx).getTxnIdLeastBits();
- Assert.assertTrue(txnIdMostBits > -1);
- Assert.assertTrue(txnIdLeastBits > -1);
-
- @Cleanup
- Producer<byte[]> outProducer = pulsarClient
- .newProducer()
- .topic(topic)
- .sendTimeout(0, TimeUnit.SECONDS)
- .enableBatching(false)
- .create();
-
- String content = "Hello Txn";
- outProducer.newMessage(tnx).value(content.getBytes(UTF_8)).send();
-
- try {
- admin.namespaces().deleteNamespace(NAMESPACE3, true);
- } catch (Exception ignore) { }
- tnx.commit().get();
- }
-
@Test
public void produceAndAbortTest() throws Exception {
produceTest(false);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 1e9ed80307d..1ca63d59bb7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -39,7 +39,6 @@ import io.netty.buffer.Unpooled;
import io.netty.util.Timeout;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
@@ -70,8 +69,6 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.intercept.BrokerInterceptor;
-import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
@@ -159,7 +156,8 @@ public class TransactionTest extends TransactionTestBase {
public void testCreateTransactionSystemTopic() throws Exception {
String subName = "test";
String topicName = TopicName.get(NAMESPACE1 + "/" +
"testCreateTransactionSystemTopic").toString();
-
+ admin.namespaces().deleteNamespace(NAMESPACE1, true);
+ admin.namespaces().createNamespace(NAMESPACE1);
try {
// init pending ack
@Cleanup
@@ -175,7 +173,7 @@ public class TransactionTest extends TransactionTestBase {
// getList does not include transaction system topic
List<String> list = admin.topics().getList(NAMESPACE1);
- assertEquals(list.size(), 4);
+ assertFalse(list.isEmpty());
list.forEach(topic ->
assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));
try {