This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 721275a5011 [fix][broker] Fix namespace deletion if __change_events
topic has not been created yet (#18804)
721275a5011 is described below
commit 721275a5011d295e1b1f7e8ae811f393d81aaaf9
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Dec 13 20:07:57 2022 +0100
[fix][broker] Fix namespace deletion if __change_events topic has not been
created yet (#18804)
---
.../pulsar/broker/service/BrokerService.java | 12 +-
.../SystemTopicBasedTopicPoliciesService.java | 134 ++++++++++++---------
.../apache/pulsar/broker/admin/AdminApi2Test.java | 8 +-
.../apache/pulsar/broker/admin/AdminApiTest.java | 43 +++++--
.../SystemTopicBasedTopicPoliciesServiceTest.java | 12 ++
5 files changed, 129 insertions(+), 80 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7dcb9a6968f..588ea391c5e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -3217,16 +3217,8 @@ public class BrokerService implements Closeable {
if (!pulsarService.getConfig().isTopicLevelPoliciesEnabled()) {
return CompletableFuture.completedFuture(null);
}
- return pulsarService.getPulsarResources().getNamespaceResources()
- .getPoliciesAsync(topicName.getNamespaceObject())
- .thenCompose(optPolicies -> {
- if (optPolicies.isPresent() && optPolicies.get().deleted) {
- // We can return the completed future directly if the
namespace is already deleted.
- return CompletableFuture.completedFuture(null);
- }
- TopicName cloneTopicName =
TopicName.get(topicName.getPartitionedTopicName());
- return
pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
- });
+ return pulsar.getTopicPoliciesService()
+
.deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName()));
}
private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName
topicName, int numPartitions) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 98c89d0226c..8d25603ccc5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -106,49 +106,53 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
return CompletableFuture.failedFuture(
new BrokerServiceException.NotAllowedException("Not
allowed to send event to health check topic"));
}
- CompletableFuture<Void> result = new CompletableFuture<>();
- try {
- createSystemTopicFactoryIfNeeded();
- } catch (PulsarServerException e) {
- result.completeExceptionally(e);
- return result;
- }
+ return pulsarService.getPulsarResources().getNamespaceResources()
+ .getPoliciesAsync(topicName.getNamespaceObject())
+ .thenCompose(namespacePolicies -> {
+ if (namespacePolicies.isPresent() &&
namespacePolicies.get().deleted) {
+ log.debug("[{}] skip sending topic policy event since
the namespace is deleted", topicName);
+ return CompletableFuture.completedFuture(null);
+ }
- SystemTopicClient<PulsarEvent> systemTopicClient =
-
namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
+ try {
+ createSystemTopicFactoryIfNeeded();
+ } catch (PulsarServerException e) {
+ return CompletableFuture.failedFuture(e);
+ }
- CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writerFuture
= systemTopicClient.newWriterAsync();
- writerFuture.whenComplete((writer, ex) -> {
- if (ex != null) {
- result.completeExceptionally(ex);
- } else {
- PulsarEvent event = getPulsarEvent(topicName, actionType,
policies);
- CompletableFuture<MessageId> actionFuture =
- ActionType.DELETE.equals(actionType) ?
writer.deleteAsync(event) : writer.writeAsync(event);
- actionFuture.whenComplete(((messageId, e) -> {
- if (e != null) {
- result.completeExceptionally(e);
- } else {
- if (messageId != null) {
- result.complete(null);
- } else {
- result.completeExceptionally(new
RuntimeException("Got message id is null."));
- }
- }
- writer.closeAsync().whenComplete((v, cause) -> {
- if (cause != null) {
- log.error("[{}] Close writer error.",
topicName, cause);
+ SystemTopicClient<PulsarEvent> systemTopicClient =
namespaceEventsSystemTopicFactory
+
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
+
+ return systemTopicClient.newWriterAsync()
+ .thenCompose(writer -> {
+ PulsarEvent event = getPulsarEvent(topicName,
actionType, policies);
+ CompletableFuture<MessageId> writeFuture =
+ ActionType.DELETE.equals(actionType) ?
writer.deleteAsync(event)
+ : writer.writeAsync(event);
+ return writeFuture.handle((messageId, e) -> {
+ if (e != null) {
+ return CompletableFuture.failedFuture(e);
} else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Close writer
success.", topicName);
+ if (messageId != null) {
+ return
CompletableFuture.completedFuture(null);
+ } else {
+ return CompletableFuture.failedFuture(
+ new RuntimeException("Got
message id is null."));
}
}
- });
- })
- );
- }
- });
- return result;
+ }).thenRun(() ->
+ writer.closeAsync().whenComplete((v,
cause) -> {
+ if (cause != null) {
+ log.error("[{}] Close writer
error.", topicName, cause);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Close
writer success.", topicName);
+ }
+ }
+ })
+ );
+ });
+ });
}
private PulsarEvent getPulsarEvent(TopicName topicName, ActionType
actionType, TopicPolicies policies) {
@@ -418,25 +422,25 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent>
reader) {
reader.readNextAsync()
- .thenAccept(msg -> {
- refreshTopicPoliciesCache(msg);
- notifyListener(msg);
- })
- .whenComplete((__, ex) -> {
- if (ex == null) {
- readMorePolicies(reader);
- } else {
- Throwable cause =
FutureUtil.unwrapCompletionException(ex);
- if (cause instanceof
PulsarClientException.AlreadyClosedException) {
- log.error("Read more topic policies exception, close
the read now!", ex);
- cleanCacheAndCloseReader(
-
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
- } else {
- log.warn("Read more topic polices exception, read
again.", ex);
- readMorePolicies(reader);
- }
- }
- });
+ .thenAccept(msg -> {
+ refreshTopicPoliciesCache(msg);
+ notifyListener(msg);
+ })
+ .whenComplete((__, ex) -> {
+ if (ex == null) {
+ readMorePolicies(reader);
+ } else {
+ Throwable cause =
FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof
PulsarClientException.AlreadyClosedException) {
+ log.warn("Read more topic policies exception,
close the read now!", ex);
+ cleanCacheAndCloseReader(
+
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
+ } else {
+ log.warn("Read more topic polices exception, read
again.", ex);
+ readMorePolicies(reader);
+ }
+ }
+ });
}
private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
@@ -504,7 +508,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
if (message instanceof MessageImpl) {
return ((MessageImpl<?>) message).hasReplicateTo()
? (((MessageImpl<?>) message).getReplicateTo().size() == 1
- ? !((MessageImpl<?>)
message).getReplicateTo().contains(clusterName) : true)
+ ? !((MessageImpl<?>)
message).getReplicateTo().contains(clusterName) : true)
: false;
}
if (message instanceof TopicMessageImpl) {
@@ -570,6 +574,20 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
});
}
+ public static String getEventKey(PulsarEvent event) {
+ return TopicName.get(event.getTopicPoliciesEvent().getDomain(),
+ event.getTopicPoliciesEvent().getTenant(),
+ event.getTopicPoliciesEvent().getNamespace(),
+ event.getTopicPoliciesEvent().getTopic()).toString();
+ }
+
+ public static String getEventKey(TopicName topicName) {
+ return TopicName.get(topicName.getDomain().toString(),
+ topicName.getTenant(),
+ topicName.getNamespace(),
+
TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString();
+ }
+
@VisibleForTesting
long getPoliciesCacheSize() {
return policiesCache.size();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 3523c56c0da..45b5134de6b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -1483,7 +1483,12 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
// create namespace2
String namespace = tenant + "/test-ns2";
- admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+ admin.namespaces().createNamespace(namespace, Set.of("test"));
+ admin.topics().createNonPartitionedTopic(namespace + "/tobedeleted");
+ // verify namespace can be deleted even without topic policy events
+ admin.namespaces().deleteNamespace(namespace, true);
+
+ admin.namespaces().createNamespace(namespace, Set.of("test"));
// create topic
String topic = namespace + "/test-topic2";
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
@@ -1717,7 +1722,6 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
@Test
public void testForceDeleteNamespace() throws Exception {
- conf.setForceDeleteNamespaceAllowed(true);
final String namespaceName = "prop-xyz2/ns1";
TenantInfoImpl tenantInfo = new
TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("prop-xyz2", tenantInfo);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 4022daca02b..c349e902882 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -171,6 +171,10 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
@BeforeMethod
@Override
public void setup() throws Exception {
+ setupConfigAndStart(null);
+ }
+
+ private void applyDefaultConfig() {
conf.setSystemTopicEnabled(false);
conf.setTopicLevelPoliciesEnabled(false);
conf.setLoadBalancerEnabled(true);
@@ -182,6 +186,13 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
conf.setSubscriptionExpiryCheckIntervalInMinutes(1);
conf.setBrokerDeleteInactiveTopicsEnabled(false);
conf.setNumExecutorThreadPoolSize(5);
+ }
+
+ private void
setupConfigAndStart(java.util.function.Consumer<ServiceConfiguration>
configurationConsumer) throws Exception {
+ applyDefaultConfig();
+ if (configurationConsumer != null) {
+ configurationConsumer.accept(conf);
+ }
super.internalSetup();
@@ -1751,7 +1762,10 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
@Test
public void
testNamespaceSplitBundleWithDefaultTopicCountEquallyDivideAlgorithm() throws
Exception {
-
conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE);
+ cleanup();
+ setupConfigAndStart(conf -> conf
+
.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE));
+
// Force to create a topic
final String namespace = "prop-xyz/ns1";
List<String> topicNames = Lists.newArrayList(
@@ -1784,8 +1798,9 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
for (int i = 0; i < bundles.getBundles().size(); i++) {
assertNotEquals(bundles.getBundles().get(i).toString(),
splitRange[i]);
}
- producers.forEach(Producer::closeAsync);
-
conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME);
+ for (Producer<byte[]> producer : producers) {
+ producer.close();
+ }
}
@Test
@@ -3265,7 +3280,8 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
@Test
public void testGetTtlDurationDefaultInSeconds() throws Exception {
- conf.setTtlDurationDefaultInSeconds(3600);
+ cleanup();
+ setupConfigAndStart(conf -> conf.setTtlDurationDefaultInSeconds(3600));
Integer seconds =
admin.namespaces().getPolicies("prop-xyz/ns1").message_ttl_in_seconds;
assertNull(seconds);
}
@@ -3308,8 +3324,11 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
final String topic =
"persistent://prop-xyz/ns1/testPartitionedTopicMsgDelayedAggregated-" +
UUID.randomUUID().toString();
final String subName = "my-sub";
final int numPartitions = 2;
- conf.setSubscriptionRedeliveryTrackerEnabled(true);
- conf.setDelayedDeliveryEnabled(true);
+ cleanup();
+ setupConfigAndStart(conf -> {
+ conf.setSubscriptionRedeliveryTrackerEnabled(true);
+ conf.setDelayedDeliveryEnabled(true);
+ });
admin.topics().createPartitionedTopic(topic, numPartitions);
for (int i = 0; i < 2; i++) {
@@ -3364,7 +3383,7 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 20000)
public void testPartitionedTopicTruncate() throws Exception {
- final String topicName =
"persistent://prop-xyz/ns1/testTruncateTopic-" + UUID.randomUUID().toString();
+ final String topicName =
"persistent://prop-xyz/ns1/testTruncateTopic2-" + UUID.randomUUID().toString();
final String subName = "my-sub";
admin.topics().createPartitionedTopic(topicName,6);
admin.namespaces().setRetention("prop-xyz/ns1", new
RetentionPolicies(60, 50));
@@ -3384,9 +3403,13 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 20000)
public void testNonPartitionedTopicTruncate() throws Exception {
- final String topicName =
"persistent://prop-xyz/ns1/testTruncateTopic-" + UUID.randomUUID().toString();
+ final String topicName =
"persistent://prop-xyz/ns1/testTruncateTopic1-" + UUID.randomUUID().toString();
final String subName = "my-sub";
- this.conf.setTopicLevelPoliciesEnabled(true);
+ cleanup();
+ setupConfigAndStart(conf -> {
+ conf.setTopicLevelPoliciesEnabled(true);
+ conf.setSystemTopicEnabled(true);
+ });
admin.topics().createNonPartitionedTopic(topicName);
admin.namespaces().setRetention("prop-xyz/ns1", new
RetentionPolicies(60, 50));
List<MessageId> messageIds =
publishMessagesOnPersistentTopic(topicName, 10);
@@ -3402,7 +3425,7 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 20000)
public void testNonPersistentTopicTruncate() throws Exception {
- final String topicName =
"non-persistent://prop-xyz/ns1/testTruncateTopic-" +
UUID.randomUUID().toString();
+ final String topicName =
"non-persistent://prop-xyz/ns1/testTruncateTopic3-" +
UUID.randomUUID().toString();
admin.topics().createNonPartitionedTopic(topicName);
assertThrows(() -> {admin.topics().truncate(topicName);});
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index f4ad2ebd91c..4f3feb778af 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -379,6 +379,18 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
});
}
+
+ @Test
+ public void testHandleNamespaceBeingDeleted() throws Exception {
+ SystemTopicBasedTopicPoliciesService service =
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
+
pulsar.getPulsarResources().getNamespaceResources().setPolicies(NamespaceName.get(NAMESPACE1),
+ old -> {
+ old.deleted = true;
+ return old;
+ });
+ service.deleteTopicPoliciesAsync(TOPIC1).get();
+ }
+
@Test
public void testGetTopicPoliciesWithCleanCache() throws Exception {
final String topic = "persistent://" + NAMESPACE1 + "/test" +
UUID.randomUUID();