This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 8138553cefb [cherry-pick][branch-2.11] cherry-pick fixing can not
delete namespace by force (#18307) (#18826)
8138553cefb is described below
commit 8138553cefbcab3614b2baa24241cb95388e125b
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri Dec 9 20:38:17 2022 +0800
[cherry-pick][branch-2.11] cherry-pick fixing can not delete namespace by
force (#18307) (#18826)
### Motivation
Cherry-pick (#18307) to release 2.11.1.
### Modifications
Cherry-pick (#18307) to release 2.11.1.
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 84 +++++++++++---
.../broker/service/persistent/PersistentTopic.java | 128 +++++++++++----------
.../apache/pulsar/broker/admin/NamespacesTest.java | 81 +++++++++++--
.../broker/transaction/TransactionProduceTest.java | 29 -----
.../pulsar/broker/transaction/TransactionTest.java | 5 +-
5 files changed, 212 insertions(+), 115 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4d8f49be965..60d171a9819 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -50,6 +50,7 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.lang3.StringUtils;
@@ -62,6 +63,7 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
@@ -471,16 +473,26 @@ public abstract class NamespacesBase extends
AdminResource {
if (!topics.isEmpty()) {
Set<String> partitionedTopics = new HashSet<>();
Set<String> nonPartitionedTopics = new HashSet<>();
+ Set<String> allSystemTopics = new HashSet<>();
+ Set<String> allPartitionedSystemTopics = new HashSet<>();
for (String topic : topics) {
try {
TopicName topicName = TopicName.get(topic);
if (topicName.isPartitioned()) {
+ if
(pulsar().getBrokerService().isSystemTopic(topicName)) {
+
allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+ continue;
+ }
String partitionedTopic =
topicName.getPartitionedTopicName();
if (!partitionedTopics.contains(partitionedTopic))
{
partitionedTopics.add(partitionedTopic);
}
} else {
+ if
(pulsar().getBrokerService().isSystemTopic(topicName)) {
+ allSystemTopics.add(topic);
+ continue;
+ }
nonPartitionedTopics.add(topic);
}
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true));
@@ -508,21 +520,24 @@ public abstract class NamespacesBase extends
AdminResource {
}
final CompletableFuture<Throwable> topicFutureEx =
- FutureUtil.waitForAll(topicFutures).handle((result,
exception) -> {
- if (exception != null) {
- if (exception.getCause() instanceof
PulsarAdminException) {
- asyncResponse
- .resume(new
RestException((PulsarAdminException) exception.getCause()));
- } else {
- log.error("[{}] Failed to remove
forcefully owned namespace {}",
- clientAppId(), namespaceName,
exception);
- asyncResponse.resume(new
RestException(exception.getCause()));
- }
- return exception;
- }
-
- return null;
- });
+ FutureUtil.waitForAll(topicFutures)
+ .thenCompose((ignore) ->
internalDeleteTopicsAsync(allSystemTopics))
+ .thenCompose((ignore) ->
+
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+ .handle((result, exception) -> {
+ if (exception != null) {
+ if (exception.getCause() instanceof
PulsarAdminException) {
+ asyncResponse.resume(
+ new
RestException((PulsarAdminException) exception.getCause()));
+ } else {
+ log.error("[{}] Failed to remove
forcefully owned namespace {}",
+ clientAppId(),
namespaceName, exception);
+ asyncResponse.resume(new
RestException(exception.getCause()));
+ }
+ return exception;
+ }
+ return null;
+ });
if (topicFutureEx.join() != null) {
return;
}
@@ -564,6 +579,45 @@ public abstract class NamespacesBase extends AdminResource
{
});
}
+ private CompletableFuture<Void>
internalDeletePartitionedTopicsAsync(Set<String> topicNames) {
+ log.info("internalDeletePartitionedTopicsAsync");
+ if (CollectionUtils.isEmpty(topicNames)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ PulsarAdmin admin;
+ try {
+ admin = pulsar().getAdminClient();
+ } catch (Exception ex) {
+ log.error("[{}] Get admin client error when preparing to delete
topics.", clientAppId(), ex);
+ return FutureUtil.failedFuture(ex);
+ }
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (String topicName : topicNames) {
+ TopicName tn = TopicName.get(topicName);
+ futures.add(admin.topics().deletePartitionedTopicAsync(topicName,
true, true));
+ }
+ return FutureUtil.waitForAll(futures);
+ }
+ private CompletableFuture<Void> internalDeleteTopicsAsync(Set<String>
topicNames) {
+ log.info("internalDeleteTopicsAsync");
+ if (CollectionUtils.isEmpty(topicNames)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ PulsarAdmin admin;
+ try {
+ admin = pulsar().getAdminClient();
+ } catch (Exception ex) {
+ log.error("[{}] Get admin client error when preparing to delete
topics.", clientAppId(), ex);
+ return FutureUtil.failedFuture(ex);
+ }
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (String topicName : topicNames) {
+ futures.add(admin.topics().deleteAsync(topicName, true, true));
+ }
+ return FutureUtil.waitForAll(futures);
+ }
+
+
@SuppressWarnings("deprecation")
protected CompletableFuture<Void>
internalDeleteNamespaceBundleAsync(String bundleRange, boolean authoritative,
boolean force) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1d1160d7dfb..5ad6891d30c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1178,68 +1178,74 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return null;
});
- closeClientFuture.thenAccept(delete -> {
- CompletableFuture<Void> deleteTopicAuthenticationFuture = new
CompletableFuture<>();
- brokerService.deleteTopicAuthenticationWithRetry(topic,
deleteTopicAuthenticationFuture, 5);
- deleteTopicAuthenticationFuture.thenCompose(__ ->
deleteSchema())
- .thenCompose(__ -> deleteTopicPolicies())
- .thenCompose(__ -> transactionBufferCleanupAndClose())
- .whenComplete((v, ex) -> {
- if (ex != null) {
- log.error("[{}] Error deleting topic", topic,
ex);
- unfenceTopicToResume();
- deleteFuture.completeExceptionally(ex);
- } else {
- List<CompletableFuture<Void>>
subsDeleteFutures = new ArrayList<>();
- subscriptions.forEach((sub, p) ->
subsDeleteFutures.add(unsubscribe(sub)));
-
-
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
- if (e != null) {
- log.error("[{}] Error deleting topic",
topic, e);
- unfenceTopicToResume();
- deleteFuture.completeExceptionally(e);
- } else {
- ledger.asyncDelete(new
AsyncCallbacks.DeleteLedgerCallback() {
- @Override
- public void
deleteLedgerComplete(Object ctx) {
-
brokerService.removeTopicFromCache(PersistentTopic.this);
-
-
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
-
-
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
-
-
unregisterTopicPolicyListener();
-
- log.info("[{}] Topic deleted",
topic);
- deleteFuture.complete(null);
- }
-
- @Override
- public void
deleteLedgerFailed(ManagedLedgerException exception,
-
Object ctx) {
- if (exception.getCause()
- instanceof
MetadataStoreException.NotFoundException) {
- log.info("[{}] Topic is
already deleted {}",
- topic,
exception.getMessage());
- deleteLedgerComplete(ctx);
- } else {
- unfenceTopicToResume();
- log.error("[{}] Error
deleting topic", topic, exception);
-
deleteFuture.completeExceptionally(
- new
PersistenceException(exception));
+ closeClientFuture.thenAccept(__ -> {
+ CompletableFuture<Void> deleteTopicAuthenticationFuture =
new CompletableFuture<>();
+ brokerService.deleteTopicAuthenticationWithRetry(topic,
deleteTopicAuthenticationFuture, 5);
+ deleteTopicAuthenticationFuture.thenCompose(ignore ->
deleteSchema())
+ .thenCompose(ignore -> {
+ if
(!this.getBrokerService().getPulsar().getBrokerService()
+ .isSystemTopic(TopicName.get(topic))) {
+ return deleteTopicPolicies();
+ } else {
+ return
CompletableFuture.completedFuture(null);
+ }
+ })
+ .thenCompose(ignore ->
transactionBufferCleanupAndClose())
+ .whenComplete((v, ex) -> {
+ if (ex != null) {
+ log.error("[{}] Error deleting topic",
topic, ex);
+ unfenceTopicToResume();
+ deleteFuture.completeExceptionally(ex);
+ } else {
+ List<CompletableFuture<Void>>
subsDeleteFutures = new ArrayList<>();
+ subscriptions.forEach((sub, p) ->
subsDeleteFutures.add(unsubscribe(sub)));
+
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
+ if (e != null) {
+ log.error("[{}] Error deleting
topic", topic, e);
+ unfenceTopicToResume();
+
deleteFuture.completeExceptionally(e);
+ } else {
+ ledger.asyncDelete(new
AsyncCallbacks.DeleteLedgerCallback() {
+ @Override
+ public void
deleteLedgerComplete(Object ctx) {
+
brokerService.removeTopicFromCache(PersistentTopic.this);
+
+
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
+
+
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
+
+
unregisterTopicPolicyListener();
+
+ log.info("[{}] Topic
deleted", topic);
+
deleteFuture.complete(null);
}
- }
- }, null);
- }
- });
- }
- });
- }).exceptionally(ex->{
- unfenceTopicToResume();
- deleteFuture.completeExceptionally(
- new TopicBusyException("Failed to close clients before
deleting topic."));
- return null;
- });
+
+ @Override
+ public void
deleteLedgerFailed(ManagedLedgerException exception,
+
Object ctx) {
+ if (exception.getCause()
+ instanceof
MetadataStoreException.NotFoundException) {
+ log.info("[{}] Topic
is already deleted {}",
+ topic,
exception.getMessage());
+
deleteLedgerComplete(ctx);
+ } else {
+ unfenceTopicToResume();
+ log.error("[{}] Error
deleting topic", topic, exception);
+
deleteFuture.completeExceptionally(
+ new
PersistenceException(exception));
+ }
+ }
+ }, null);
+ }
+ });
+ }
+ });
+ }).exceptionally(ex->{
+ unfenceTopicToResume();
+ deleteFuture.completeExceptionally(
+ new TopicBusyException("Failed to close clients
before deleting topic."));
+ return null;
+ });
} finally {
lock.writeLock().unlock();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 93ac82ade44..dc4df8330cc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -70,6 +70,7 @@ import
org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.service.AbstractTopic;
+import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
@@ -85,6 +86,8 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
@@ -111,6 +114,7 @@ import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -1389,8 +1393,8 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
-
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
- new Long(-1));
+
assertEquals(Long.compare(ledgerConf.getLedgerOffloader().getOffloadPolicies()
+ .getManagedLedgerOffloadThresholdInBytes(), -1L), 0);
// set an override for the namespace
admin.namespaces().setOffloadThreshold(namespace, 100);
@@ -1406,8 +1410,8 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
-
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
- new Long(100));
+
assertEquals(Long.compare(ledgerConf.getLedgerOffloader().getOffloadPolicies()
+ .getManagedLedgerOffloadThresholdInBytes(), 100L), 0);
// set another negative value to disable
admin.namespaces().setOffloadThreshold(namespace, -2);
@@ -1422,8 +1426,8 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
-
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
- new Long(-2));
+
assertEquals(Long.compare(ledgerConf.getLedgerOffloader().getOffloadPolicies()
+ .getManagedLedgerOffloadThresholdInBytes(), -2L), 0);
// set back to -1 and fall back to default
admin.namespaces().setOffloadThreshold(namespace, -1);
@@ -1438,8 +1442,8 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
-
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
- new Long(-1));
+
assertEquals(Long.compare(ledgerConf.getLedgerOffloader().getOffloadPolicies()
+ .getManagedLedgerOffloadThresholdInBytes(), -1L), 0);
// cleanup
admin.topics().delete(topicName.toString(), true);
@@ -1881,4 +1885,65 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void testFinallyDeleteSystemTopicWhenDeleteNamespace() throws
Exception {
+ String namespace = this.testTenant + "/delete-namespace";
+ String topic = TopicName.get(TopicDomain.persistent.toString(),
this.testTenant, "delete-namespace",
+ "testFinallyDeleteSystemTopicWhenDeleteNamespace").toString();
+
+ // 0. enable topic level polices and system topic
+ pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+ pulsar.getConfig().setSystemTopicEnabled(true);
+ pulsar.getConfig().setForceDeleteNamespaceAllowed(true);
+ Field policesService =
pulsar.getClass().getDeclaredField("topicPoliciesService");
+ policesService.setAccessible(true);
+ policesService.set(pulsar, new
SystemTopicBasedTopicPoliciesService(pulsar));
+
+ // 1. create a test namespace.
+ admin.namespaces().createNamespace(namespace);
+ // 2. create a test topic.
+ admin.topics().createNonPartitionedTopic(topic);
+ // 3. change policy of the topic.
+ admin.topicPolicies().setMaxConsumers(topic, 5);
+ // 4. change the order of the topics in this namespace.
+ List<String> topics =
pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(namespace)).get();
+ Assert.assertTrue(topics.size() >= 2);
+ for (int i = 0; i < topics.size(); i++) {
+ if
(topics.get(i).contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)) {
+ String systemTopic = topics.get(i);
+ topics.set(i, topics.get(0));
+ topics.set(0, systemTopic);
+ }
+ }
+ NamespaceService mockNamespaceService =
spy(pulsar.getNamespaceService());
+ Field namespaceServiceField =
pulsar.getClass().getDeclaredField("nsService");
+ namespaceServiceField.setAccessible(true);
+ namespaceServiceField.set(pulsar, mockNamespaceService);
+
doReturn(CompletableFuture.completedFuture(topics)).when(mockNamespaceService).getFullListOfTopics(any());
+ // 5. delete the namespace
+ admin.namespaces().deleteNamespace(namespace, true);
+
+ }
+
+ @Test
+ public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws
Exception {
+ String namespace = this.testTenant + "/delete-systemTopic";
+ String topic = TopicName.get(TopicDomain.persistent.toString(),
this.testTenant, "delete-systemTopic",
+ "testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
+
+ // 0. enable topic level polices and system topic
+ pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+ pulsar.getConfig().setSystemTopicEnabled(true);
+ Field policesService =
pulsar.getClass().getDeclaredField("topicPoliciesService");
+ policesService.setAccessible(true);
+ policesService.set(pulsar, new
SystemTopicBasedTopicPoliciesService(pulsar));
+ // 1. create a test namespace.
+ admin.namespaces().createNamespace(namespace);
+ // 2. create a test topic.
+ admin.topics().createNonPartitionedTopic(topic);
+ // 3. change policy of the topic.
+ admin.topicPolicies().setMaxConsumers(topic, 5);
+ // 4. delete the policies topic and the topic wil not to clear topic
polices
+ admin.topics().delete(namespace + "/" +
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index d43221a64e2..0d1bbda4568 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -89,35 +89,6 @@ public class TransactionProduceTest extends
TransactionTestBase {
produceTest(true);
}
- @Test
- public void testDeleteNamespaceBeforeCommit() throws Exception {
- final String topic = NAMESPACE1 + "/testDeleteTopicBeforeCommit";
- PulsarClient pulsarClient = this.pulsarClient;
- Transaction tnx = pulsarClient.newTransaction()
- .withTransactionTimeout(60, TimeUnit.SECONDS)
- .build().get();
- long txnIdMostBits = ((TransactionImpl) tnx).getTxnIdMostBits();
- long txnIdLeastBits = ((TransactionImpl) tnx).getTxnIdLeastBits();
- Assert.assertTrue(txnIdMostBits > -1);
- Assert.assertTrue(txnIdLeastBits > -1);
-
- @Cleanup
- Producer<byte[]> outProducer = pulsarClient
- .newProducer()
- .topic(topic)
- .sendTimeout(0, TimeUnit.SECONDS)
- .enableBatching(false)
- .create();
-
- String content = "Hello Txn";
- outProducer.newMessage(tnx).value(content.getBytes(UTF_8)).send();
-
- try {
- deleteNamespaceGraceFully(NAMESPACE1, true);
- } catch (Exception ignore) {}
- tnx.commit().get();
- }
-
@Test
public void produceAndAbortTest() throws Exception {
produceTest(false);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 307244a6447..f0417575446 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -166,7 +166,8 @@ public class TransactionTest extends TransactionTestBase {
public void testCreateTransactionSystemTopic() throws Exception {
String subName = "test";
String topicName = TopicName.get(NAMESPACE1 + "/" +
"testCreateTransactionSystemTopic").toString();
-
+ admin.namespaces().deleteNamespace(NAMESPACE1, true);
+ admin.namespaces().createNamespace(NAMESPACE1);
try {
// init pending ack
@Cleanup
@@ -182,7 +183,7 @@ public class TransactionTest extends TransactionTestBase {
// getList does not include transaction system topic
List<String> list = admin.topics().getList(NAMESPACE1);
- assertEquals(list.size(), 2);
+ assertFalse(list.isEmpty());
list.forEach(topic ->
assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));
try {