This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 8138553cefb [cherry-pick][branch-2.11] cherry-pick fixing can not 
delete namespace by force (#18307) (#18826)
8138553cefb is described below

commit 8138553cefbcab3614b2baa24241cb95388e125b
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri Dec 9 20:38:17 2022 +0800

    [cherry-pick][branch-2.11] cherry-pick fixing can not delete namespace by 
force (#18307) (#18826)
    
    ### Motivation
    Cherry-pick (#18307) to release 2.11.1.
    ### Modifications
    
    Cherry-pick (#18307) to release 2.11.1.
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  84 +++++++++++---
 .../broker/service/persistent/PersistentTopic.java | 128 +++++++++++----------
 .../apache/pulsar/broker/admin/NamespacesTest.java |  81 +++++++++++--
 .../broker/transaction/TransactionProduceTest.java |  29 -----
 .../pulsar/broker/transaction/TransactionTest.java |   5 +-
 5 files changed, 212 insertions(+), 115 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4d8f49be965..60d171a9819 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -50,6 +50,7 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang.mutable.MutableObject;
 import org.apache.commons.lang3.StringUtils;
@@ -62,6 +63,7 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
@@ -471,16 +473,26 @@ public abstract class NamespacesBase extends 
AdminResource {
             if (!topics.isEmpty()) {
                 Set<String> partitionedTopics = new HashSet<>();
                 Set<String> nonPartitionedTopics = new HashSet<>();
+                Set<String> allSystemTopics = new HashSet<>();
+                Set<String> allPartitionedSystemTopics = new HashSet<>();
 
                 for (String topic : topics) {
                     try {
                         TopicName topicName = TopicName.get(topic);
                         if (topicName.isPartitioned()) {
+                            if 
(pulsar().getBrokerService().isSystemTopic(topicName)) {
+                                
allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+                                continue;
+                            }
                             String partitionedTopic = 
topicName.getPartitionedTopicName();
                             if (!partitionedTopics.contains(partitionedTopic)) 
{
                                 partitionedTopics.add(partitionedTopic);
                             }
                         } else {
+                            if 
(pulsar().getBrokerService().isSystemTopic(topicName)) {
+                                allSystemTopics.add(topic);
+                                continue;
+                            }
                             nonPartitionedTopics.add(topic);
                         }
                         
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true));
@@ -508,21 +520,24 @@ public abstract class NamespacesBase extends 
AdminResource {
                 }
 
                 final CompletableFuture<Throwable> topicFutureEx =
-                        FutureUtil.waitForAll(topicFutures).handle((result, 
exception) -> {
-                            if (exception != null) {
-                                if (exception.getCause() instanceof 
PulsarAdminException) {
-                                    asyncResponse
-                                            .resume(new 
RestException((PulsarAdminException) exception.getCause()));
-                                } else {
-                                    log.error("[{}] Failed to remove 
forcefully owned namespace {}",
-                                            clientAppId(), namespaceName, 
exception);
-                                    asyncResponse.resume(new 
RestException(exception.getCause()));
-                                }
-                                return exception;
-                            }
-
-                            return null;
-                        });
+                        FutureUtil.waitForAll(topicFutures)
+                                .thenCompose((ignore) -> 
internalDeleteTopicsAsync(allSystemTopics))
+                                .thenCompose((ignore) ->
+                                        
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+                                .handle((result, exception) -> {
+                                    if (exception != null) {
+                                        if (exception.getCause() instanceof 
PulsarAdminException) {
+                                            asyncResponse.resume(
+                                                    new 
RestException((PulsarAdminException) exception.getCause()));
+                                        } else {
+                                            log.error("[{}] Failed to remove 
forcefully owned namespace {}",
+                                                    clientAppId(), 
namespaceName, exception);
+                                            asyncResponse.resume(new 
RestException(exception.getCause()));
+                                        }
+                                        return exception;
+                                    }
+                                    return null;
+                                });
                 if (topicFutureEx.join() != null) {
                     return;
                 }
@@ -564,6 +579,45 @@ public abstract class NamespacesBase extends AdminResource 
{
         });
     }
 
+    private CompletableFuture<Void> 
internalDeletePartitionedTopicsAsync(Set<String> topicNames) {
+        log.info("internalDeletePartitionedTopicsAsync");
+        if (CollectionUtils.isEmpty(topicNames)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (Exception ex) {
+            log.error("[{}] Get admin client error when preparing to delete 
topics.", clientAppId(), ex);
+            return FutureUtil.failedFuture(ex);
+        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        for (String topicName : topicNames) {
+            TopicName tn = TopicName.get(topicName);
+            futures.add(admin.topics().deletePartitionedTopicAsync(topicName, 
true, true));
+        }
+        return FutureUtil.waitForAll(futures);
+    }
+    private CompletableFuture<Void> internalDeleteTopicsAsync(Set<String> 
topicNames) {
+        log.info("internalDeleteTopicsAsync");
+        if (CollectionUtils.isEmpty(topicNames)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (Exception ex) {
+            log.error("[{}] Get admin client error when preparing to delete 
topics.", clientAppId(), ex);
+            return FutureUtil.failedFuture(ex);
+        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        for (String topicName : topicNames) {
+            futures.add(admin.topics().deleteAsync(topicName, true, true));
+        }
+        return FutureUtil.waitForAll(futures);
+    }
+
+
     @SuppressWarnings("deprecation")
     protected CompletableFuture<Void> 
internalDeleteNamespaceBundleAsync(String bundleRange, boolean authoritative,
                                                                          
boolean force) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1d1160d7dfb..5ad6891d30c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1178,68 +1178,74 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 return null;
             });
 
-            closeClientFuture.thenAccept(delete -> {
-                CompletableFuture<Void> deleteTopicAuthenticationFuture = new 
CompletableFuture<>();
-                brokerService.deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
-                deleteTopicAuthenticationFuture.thenCompose(__ -> 
deleteSchema())
-                        .thenCompose(__ -> deleteTopicPolicies())
-                        .thenCompose(__ -> transactionBufferCleanupAndClose())
-                        .whenComplete((v, ex) -> {
-                            if (ex != null) {
-                                log.error("[{}] Error deleting topic", topic, 
ex);
-                                unfenceTopicToResume();
-                                deleteFuture.completeExceptionally(ex);
-                            } else {
-                                List<CompletableFuture<Void>> 
subsDeleteFutures = new ArrayList<>();
-                                subscriptions.forEach((sub, p) -> 
subsDeleteFutures.add(unsubscribe(sub)));
-
-                                
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
-                                    if (e != null) {
-                                        log.error("[{}] Error deleting topic", 
topic, e);
-                                        unfenceTopicToResume();
-                                        deleteFuture.completeExceptionally(e);
-                                    } else {
-                                        ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
-                                            @Override
-                                            public void 
deleteLedgerComplete(Object ctx) {
-                                                
brokerService.removeTopicFromCache(PersistentTopic.this);
-
-                                                
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
-
-                                                
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
-
-                                                
unregisterTopicPolicyListener();
-
-                                                log.info("[{}] Topic deleted", 
topic);
-                                                deleteFuture.complete(null);
-                                            }
-
-                                            @Override
-                                            public void 
deleteLedgerFailed(ManagedLedgerException exception,
-                                                                           
Object ctx) {
-                                                if (exception.getCause()
-                                                        instanceof 
MetadataStoreException.NotFoundException) {
-                                                    log.info("[{}] Topic is 
already deleted {}",
-                                                            topic, 
exception.getMessage());
-                                                    deleteLedgerComplete(ctx);
-                                                } else {
-                                                    unfenceTopicToResume();
-                                                    log.error("[{}] Error 
deleting topic", topic, exception);
-                                                    
deleteFuture.completeExceptionally(
-                                                            new 
PersistenceException(exception));
+                closeClientFuture.thenAccept(__ -> {
+                    CompletableFuture<Void> deleteTopicAuthenticationFuture = 
new CompletableFuture<>();
+                    brokerService.deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
+                    deleteTopicAuthenticationFuture.thenCompose(ignore -> 
deleteSchema())
+                            .thenCompose(ignore -> {
+                                if 
(!this.getBrokerService().getPulsar().getBrokerService()
+                                        .isSystemTopic(TopicName.get(topic))) {
+                                    return deleteTopicPolicies();
+                                } else {
+                                    return 
CompletableFuture.completedFuture(null);
+                                }
+                                })
+                            .thenCompose(ignore -> 
transactionBufferCleanupAndClose())
+                            .whenComplete((v, ex) -> {
+                                if (ex != null) {
+                                    log.error("[{}] Error deleting topic", 
topic, ex);
+                                    unfenceTopicToResume();
+                                    deleteFuture.completeExceptionally(ex);
+                                } else {
+                                    List<CompletableFuture<Void>> 
subsDeleteFutures = new ArrayList<>();
+                                    subscriptions.forEach((sub, p) -> 
subsDeleteFutures.add(unsubscribe(sub)));
+                                    
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
+                                        if (e != null) {
+                                            log.error("[{}] Error deleting 
topic", topic, e);
+                                            unfenceTopicToResume();
+                                            
deleteFuture.completeExceptionally(e);
+                                        } else {
+                                            ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
+                                                @Override
+                                                public void 
deleteLedgerComplete(Object ctx) {
+                                                    
brokerService.removeTopicFromCache(PersistentTopic.this);
+
+                                                    
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
+
+                                                    
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
+
+                                                    
unregisterTopicPolicyListener();
+
+                                                    log.info("[{}] Topic 
deleted", topic);
+                                                    
deleteFuture.complete(null);
                                                 }
-                                            }
-                                        }, null);
-                                    }
-                                });
-                            }
-                        });
-            }).exceptionally(ex->{
-                unfenceTopicToResume();
-                deleteFuture.completeExceptionally(
-                        new TopicBusyException("Failed to close clients before 
deleting topic."));
-                return null;
-            });
+
+                                                @Override
+                                                public void 
deleteLedgerFailed(ManagedLedgerException exception,
+                                                                               
Object ctx) {
+                                                    if (exception.getCause()
+                                                            instanceof 
MetadataStoreException.NotFoundException) {
+                                                        log.info("[{}] Topic 
is already deleted {}",
+                                                                topic, 
exception.getMessage());
+                                                        
deleteLedgerComplete(ctx);
+                                                    } else {
+                                                        unfenceTopicToResume();
+                                                        log.error("[{}] Error 
deleting topic", topic, exception);
+                                                        
deleteFuture.completeExceptionally(
+                                                                new 
PersistenceException(exception));
+                                                    }
+                                                }
+                                            }, null);
+                                         }
+                                    });
+                                }
+                            });
+                }).exceptionally(ex->{
+                    unfenceTopicToResume();
+                    deleteFuture.completeExceptionally(
+                            new TopicBusyException("Failed to close clients 
before deleting topic."));
+                    return null;
+                });
         } finally {
             lock.writeLock().unlock();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 93ac82ade44..dc4df8330cc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -70,6 +70,7 @@ import 
org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
 import org.apache.pulsar.broker.service.AbstractTopic;
+import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
@@ -85,6 +86,8 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
@@ -111,6 +114,7 @@ import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -1389,8 +1393,8 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
                 
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
                 OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
         ledgerConf.setLedgerOffloader(offloader);
-        
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
-                new Long(-1));
+        
assertEquals(Long.compare(ledgerConf.getLedgerOffloader().getOffloadPolicies()
+                .getManagedLedgerOffloadThresholdInBytes(), -1L), 0);
 
         // set an override for the namespace
         admin.namespaces().setOffloadThreshold(namespace, 100);
@@ -1406,8 +1410,8 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
                 
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
                 OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
         ledgerConf.setLedgerOffloader(offloader);
-        
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
-                new Long(100));
+        
assertEquals(Long.compare(ledgerConf.getLedgerOffloader().getOffloadPolicies()
+                .getManagedLedgerOffloadThresholdInBytes(), 100L), 0);
 
         // set another negative value to disable
         admin.namespaces().setOffloadThreshold(namespace, -2);
@@ -1422,8 +1426,8 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
                 
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
                 OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
         ledgerConf.setLedgerOffloader(offloader);
-        
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
-                new Long(-2));
+        
assertEquals(Long.compare(ledgerConf.getLedgerOffloader().getOffloadPolicies()
+                .getManagedLedgerOffloadThresholdInBytes(), -2L), 0);
 
         // set back to -1 and fall back to default
         admin.namespaces().setOffloadThreshold(namespace, -1);
@@ -1438,8 +1442,8 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
                 
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
                 OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
         ledgerConf.setLedgerOffloader(offloader);
-        
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
-                new Long(-1));
+        
assertEquals(Long.compare(ledgerConf.getLedgerOffloader().getOffloadPolicies()
+                .getManagedLedgerOffloadThresholdInBytes(), -1L), 0);
 
         // cleanup
         admin.topics().delete(topicName.toString(), true);
@@ -1881,4 +1885,65 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testFinallyDeleteSystemTopicWhenDeleteNamespace() throws 
Exception {
+        String namespace = this.testTenant + "/delete-namespace";
+        String topic = TopicName.get(TopicDomain.persistent.toString(), 
this.testTenant, "delete-namespace",
+                "testFinallyDeleteSystemTopicWhenDeleteNamespace").toString();
+
+        // 0. enable topic level polices and system topic
+        pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+        pulsar.getConfig().setSystemTopicEnabled(true);
+        pulsar.getConfig().setForceDeleteNamespaceAllowed(true);
+        Field policesService = 
pulsar.getClass().getDeclaredField("topicPoliciesService");
+        policesService.setAccessible(true);
+        policesService.set(pulsar, new 
SystemTopicBasedTopicPoliciesService(pulsar));
+
+        // 1. create a test namespace.
+        admin.namespaces().createNamespace(namespace);
+        // 2. create a test topic.
+        admin.topics().createNonPartitionedTopic(topic);
+        // 3. change policy of the topic.
+        admin.topicPolicies().setMaxConsumers(topic, 5);
+        // 4. change the order of the topics in this namespace.
+        List<String> topics = 
pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(namespace)).get();
+        Assert.assertTrue(topics.size() >= 2);
+        for (int i = 0; i < topics.size(); i++) {
+            if 
(topics.get(i).contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)) {
+                String systemTopic = topics.get(i);
+                topics.set(i, topics.get(0));
+                topics.set(0, systemTopic);
+            }
+        }
+        NamespaceService mockNamespaceService = 
spy(pulsar.getNamespaceService());
+        Field namespaceServiceField = 
pulsar.getClass().getDeclaredField("nsService");
+        namespaceServiceField.setAccessible(true);
+        namespaceServiceField.set(pulsar, mockNamespaceService);
+        
doReturn(CompletableFuture.completedFuture(topics)).when(mockNamespaceService).getFullListOfTopics(any());
+        // 5. delete the namespace
+        admin.namespaces().deleteNamespace(namespace, true);
+
+    }
+
+    @Test
+    public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws 
Exception {
+        String namespace = this.testTenant + "/delete-systemTopic";
+        String topic = TopicName.get(TopicDomain.persistent.toString(), 
this.testTenant, "delete-systemTopic",
+                "testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
+
+        // 0. enable topic level polices and system topic
+        pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+        pulsar.getConfig().setSystemTopicEnabled(true);
+        Field policesService = 
pulsar.getClass().getDeclaredField("topicPoliciesService");
+        policesService.setAccessible(true);
+        policesService.set(pulsar, new 
SystemTopicBasedTopicPoliciesService(pulsar));
+        // 1. create a test namespace.
+        admin.namespaces().createNamespace(namespace);
+        // 2. create a test topic.
+        admin.topics().createNonPartitionedTopic(topic);
+        // 3. change policy of the topic.
+        admin.topicPolicies().setMaxConsumers(topic, 5);
+        // 4. delete the policies topic and the topic wil not to clear topic 
polices
+        admin.topics().delete(namespace + "/" + 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index d43221a64e2..0d1bbda4568 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -89,35 +89,6 @@ public class TransactionProduceTest extends 
TransactionTestBase {
         produceTest(true);
     }
 
-    @Test
-    public void testDeleteNamespaceBeforeCommit() throws Exception {
-        final String topic = NAMESPACE1 + "/testDeleteTopicBeforeCommit";
-        PulsarClient pulsarClient = this.pulsarClient;
-        Transaction tnx = pulsarClient.newTransaction()
-                .withTransactionTimeout(60, TimeUnit.SECONDS)
-                .build().get();
-        long txnIdMostBits = ((TransactionImpl) tnx).getTxnIdMostBits();
-        long txnIdLeastBits = ((TransactionImpl) tnx).getTxnIdLeastBits();
-        Assert.assertTrue(txnIdMostBits > -1);
-        Assert.assertTrue(txnIdLeastBits > -1);
-
-        @Cleanup
-        Producer<byte[]> outProducer = pulsarClient
-                .newProducer()
-                .topic(topic)
-                .sendTimeout(0, TimeUnit.SECONDS)
-                .enableBatching(false)
-                .create();
-
-        String content = "Hello Txn";
-        outProducer.newMessage(tnx).value(content.getBytes(UTF_8)).send();
-
-        try {
-            deleteNamespaceGraceFully(NAMESPACE1, true);
-        } catch (Exception ignore) {}
-        tnx.commit().get();
-    }
-
     @Test
     public void produceAndAbortTest() throws Exception {
         produceTest(false);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 307244a6447..f0417575446 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -166,7 +166,8 @@ public class TransactionTest extends TransactionTestBase {
     public void testCreateTransactionSystemTopic() throws Exception {
         String subName = "test";
         String topicName = TopicName.get(NAMESPACE1 + "/" + 
"testCreateTransactionSystemTopic").toString();
-
+        admin.namespaces().deleteNamespace(NAMESPACE1, true);
+        admin.namespaces().createNamespace(NAMESPACE1);
         try {
             // init pending ack
             @Cleanup
@@ -182,7 +183,7 @@ public class TransactionTest extends TransactionTestBase {
 
         // getList does not include transaction system topic
         List<String> list = admin.topics().getList(NAMESPACE1);
-        assertEquals(list.size(), 2);
+        assertFalse(list.isEmpty());
         list.forEach(topic -> 
assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));
 
         try {

Reply via email to