This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 789122b4d90 [fix][broker] Fix namespace deletion if __change_events
topic has not been created yet (#18804)
789122b4d90 is described below
commit 789122b4d90601b1be47cfd2ccab2fe3124cf907
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Dec 13 20:07:57 2022 +0100
[fix][broker] Fix namespace deletion if __change_events topic has not been
created yet (#18804)
---
.../pulsar/broker/service/BrokerService.java | 12 +-
.../SystemTopicBasedTopicPoliciesService.java | 121 +++++++++++----------
.../apache/pulsar/broker/admin/AdminApi2Test.java | 6 +-
.../apache/pulsar/broker/admin/AdminApiTest.java | 42 ++++---
.../SystemTopicBasedTopicPoliciesServiceTest.java | 11 ++
5 files changed, 109 insertions(+), 83 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c5f3d508a56..bcec8351733 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -3285,16 +3285,8 @@ public class BrokerService implements Closeable {
if (!pulsarService.getConfig().isTopicLevelPoliciesEnabled()) {
return CompletableFuture.completedFuture(null);
}
- return pulsarService.getPulsarResources().getNamespaceResources()
- .getPoliciesAsync(topicName.getNamespaceObject())
- .thenCompose(optPolicies -> {
- if (optPolicies.isPresent() && optPolicies.get().deleted) {
- // We can return the completed future directly if the
namespace is already deleted.
- return CompletableFuture.completedFuture(null);
- }
- TopicName cloneTopicName =
TopicName.get(topicName.getPartitionedTopicName());
- return
pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
- });
+ return pulsar.getTopicPoliciesService()
+
.deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName()));
}
private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName
topicName, int numPartitions) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index d4c41230daf..9ec374264e9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -105,50 +105,53 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
return CompletableFuture.failedFuture(
new BrokerServiceException.NotAllowedException("Not
allowed to send event to health check topic"));
}
- CompletableFuture<Void> result = new CompletableFuture<>();
- try {
- createSystemTopicFactoryIfNeeded();
- } catch (PulsarServerException e) {
- result.completeExceptionally(e);
- return result;
- }
+ return pulsarService.getPulsarResources().getNamespaceResources()
+ .getPoliciesAsync(topicName.getNamespaceObject())
+ .thenCompose(namespacePolicies -> {
+ if (namespacePolicies.isPresent() &&
namespacePolicies.get().deleted) {
+ log.debug("[{}] skip sending topic policy event since
the namespace is deleted", topicName);
+ return CompletableFuture.completedFuture(null);
+ }
- SystemTopicClient<PulsarEvent> systemTopicClient =
-
namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
+ try {
+ createSystemTopicFactoryIfNeeded();
+ } catch (PulsarServerException e) {
+ return CompletableFuture.failedFuture(e);
+ }
- CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writerFuture
= systemTopicClient.newWriterAsync();
- writerFuture.whenComplete((writer, ex) -> {
- if (ex != null) {
- result.completeExceptionally(ex);
- } else {
- PulsarEvent event = getPulsarEvent(topicName, actionType,
policies);
- CompletableFuture<MessageId> actionFuture =
- ActionType.DELETE.equals(actionType) ?
writer.deleteAsync(getEventKey(event), event)
- : writer.writeAsync(getEventKey(event), event);
- actionFuture.whenComplete(((messageId, e) -> {
- if (e != null) {
- result.completeExceptionally(e);
- } else {
- if (messageId != null) {
- result.complete(null);
- } else {
- result.completeExceptionally(new
RuntimeException("Got message id is null."));
- }
- }
- writer.closeAsync().whenComplete((v, cause) -> {
- if (cause != null) {
- log.error("[{}] Close writer error.",
topicName, cause);
+ SystemTopicClient<PulsarEvent> systemTopicClient =
namespaceEventsSystemTopicFactory
+
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
+
+ return systemTopicClient.newWriterAsync()
+ .thenCompose(writer -> {
+ PulsarEvent event = getPulsarEvent(topicName,
actionType, policies);
+ CompletableFuture<MessageId> writeFuture =
+ ActionType.DELETE.equals(actionType) ?
writer.deleteAsync(getEventKey(event), event)
+ :
writer.writeAsync(getEventKey(event), event);
+ return writeFuture.handle((messageId, e) -> {
+ if (e != null) {
+ return CompletableFuture.failedFuture(e);
} else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Close writer
success.", topicName);
+ if (messageId != null) {
+ return
CompletableFuture.completedFuture(null);
+ } else {
+ return CompletableFuture.failedFuture(
+ new RuntimeException("Got
message id is null."));
}
}
- });
- })
- );
- }
- });
- return result;
+ }).thenRun(() ->
+ writer.closeAsync().whenComplete((v,
cause) -> {
+ if (cause != null) {
+ log.error("[{}] Close writer
error.", topicName, cause);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Close
writer success.", topicName);
+ }
+ }
+ })
+ );
+ });
+ });
}
private PulsarEvent getPulsarEvent(TopicName topicName, ActionType
actionType, TopicPolicies policies) {
@@ -390,25 +393,25 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent>
reader) {
reader.readNextAsync()
- .thenAccept(msg -> {
- refreshTopicPoliciesCache(msg);
- notifyListener(msg);
- })
- .whenComplete((__, ex) -> {
- if (ex == null) {
- readMorePolicies(reader);
- } else {
- Throwable cause =
FutureUtil.unwrapCompletionException(ex);
- if (cause instanceof
PulsarClientException.AlreadyClosedException) {
- log.warn("Read more topic policies exception, close
the read now!", ex);
- cleanCacheAndCloseReader(
-
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
- } else {
- log.warn("Read more topic polices exception, read
again.", ex);
- readMorePolicies(reader);
- }
- }
- });
+ .thenAccept(msg -> {
+ refreshTopicPoliciesCache(msg);
+ notifyListener(msg);
+ })
+ .whenComplete((__, ex) -> {
+ if (ex == null) {
+ readMorePolicies(reader);
+ } else {
+ Throwable cause =
FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof
PulsarClientException.AlreadyClosedException) {
+ log.warn("Read more topic policies exception,
close the read now!", ex);
+ cleanCacheAndCloseReader(
+
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
+ } else {
+ log.warn("Read more topic polices exception, read
again.", ex);
+ readMorePolicies(reader);
+ }
+ }
+ });
}
private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
@@ -477,7 +480,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
if (message instanceof MessageImpl) {
return ((MessageImpl<?>) message).hasReplicateTo()
? (((MessageImpl<?>) message).getReplicateTo().size() == 1
- ? !((MessageImpl<?>)
message).getReplicateTo().contains(clusterName) : true)
+ ? !((MessageImpl<?>)
message).getReplicateTo().contains(clusterName) : true)
: false;
}
if (message instanceof TopicMessageImpl) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index a34e0c189bb..11c84d990f6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -1649,6 +1649,11 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
// create namespace2
String namespace = tenant + "/test-ns2";
+ admin.namespaces().createNamespace(namespace, Set.of("test"));
+ admin.topics().createNonPartitionedTopic(namespace + "/tobedeleted");
+ // verify namespace can be deleted even without topic policy events
+ admin.namespaces().deleteNamespace(namespace, true);
+
admin.namespaces().createNamespace(namespace, Set.of("test"));
// create topic
String topic = namespace + "/test-topic2";
@@ -1873,7 +1878,6 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
@Test
public void testForceDeleteNamespace() throws Exception {
- conf.setForceDeleteNamespaceAllowed(true);
final String namespaceName = "prop-xyz2/ns1";
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1",
"role2"), Set.of("test"));
admin.tenants().createTenant("prop-xyz2", tenantInfo);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index b303386c65f..fec0ce2da6e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -167,6 +167,10 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
@BeforeClass
@Override
public void setup() throws Exception {
+ setupConfigAndStart(null);
+ }
+
+ private void applyDefaultConfig() {
conf.setSystemTopicEnabled(false);
conf.setTopicLevelPoliciesEnabled(false);
conf.setLoadBalancerEnabled(true);
@@ -178,6 +182,13 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
conf.setSubscriptionExpiryCheckIntervalInMinutes(1);
conf.setBrokerDeleteInactiveTopicsEnabled(false);
conf.setNumExecutorThreadPoolSize(5);
+ }
+
+ private void
setupConfigAndStart(java.util.function.Consumer<ServiceConfiguration>
configurationConsumer) throws Exception {
+ applyDefaultConfig();
+ if (configurationConsumer != null) {
+ configurationConsumer.accept(conf);
+ }
super.internalSetup();
@@ -215,6 +226,7 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
resetConfig();
+ applyDefaultConfig();
setupClusters();
}
@@ -1718,9 +1730,9 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
@Test
public void
testNamespaceSplitBundleWithDefaultTopicCountEquallyDivideAlgorithm() throws
Exception {
cleanup();
- setup();
+ setupConfigAndStart(conf -> conf
+
.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE));
-
conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE);
// Force to create a topic
final String namespace = "prop-xyz/ns1";
List<String> topicNames = Lists.newArrayList(
@@ -1756,7 +1768,6 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
for (Producer<byte[]> producer : producers) {
producer.close();
}
-
conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME);
}
@Test
@@ -1911,9 +1922,6 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
@Test(dataProvider = "numBundles")
public void testNamespaceBundleUnload(Integer numBundles) throws Exception
{
- cleanup();
- setup();
-
admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
admin.namespaces().setNamespaceReplicationClusters("prop-xyz/ns1-bundles",
Set.of("test"));
@@ -3259,7 +3267,8 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
@Test
public void testGetTtlDurationDefaultInSeconds() throws Exception {
- conf.setTtlDurationDefaultInSeconds(3600);
+ cleanup();
+ setupConfigAndStart(conf -> conf.setTtlDurationDefaultInSeconds(3600));
Integer seconds =
admin.namespaces().getPolicies("prop-xyz/ns1").message_ttl_in_seconds;
assertNull(seconds);
}
@@ -3309,8 +3318,11 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
final String topic =
"persistent://prop-xyz/ns1/testPartitionedTopicMsgDelayedAggregated-" +
UUID.randomUUID().toString();
final String subName = "my-sub";
final int numPartitions = 2;
- conf.setSubscriptionRedeliveryTrackerEnabled(true);
- conf.setDelayedDeliveryEnabled(true);
+ cleanup();
+ setupConfigAndStart(conf -> {
+ conf.setSubscriptionRedeliveryTrackerEnabled(true);
+ conf.setDelayedDeliveryEnabled(true);
+ });
admin.topics().createPartitionedTopic(topic, numPartitions);
for (int i = 0; i < 2; i++) {
@@ -3367,7 +3379,7 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 20000)
public void testPartitionedTopicTruncate() throws Exception {
- final String topicName =
"persistent://prop-xyz/ns1/testTruncateTopic-" + UUID.randomUUID().toString();
+ final String topicName =
"persistent://prop-xyz/ns1/testTruncateTopic2-" + UUID.randomUUID().toString();
final String subName = "my-sub";
admin.topics().createPartitionedTopic(topicName,6);
admin.namespaces().setRetention("prop-xyz/ns1", new
RetentionPolicies(60, 50));
@@ -3387,9 +3399,13 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 20000)
public void testNonPartitionedTopicTruncate() throws Exception {
- final String topicName =
"persistent://prop-xyz/ns1/testTruncateTopic-" + UUID.randomUUID().toString();
+ final String topicName =
"persistent://prop-xyz/ns1/testTruncateTopic1-" + UUID.randomUUID().toString();
final String subName = "my-sub";
- this.conf.setTopicLevelPoliciesEnabled(true);
+ cleanup();
+ setupConfigAndStart(conf -> {
+ conf.setTopicLevelPoliciesEnabled(true);
+ conf.setSystemTopicEnabled(true);
+ });
admin.topics().createNonPartitionedTopic(topicName);
admin.namespaces().setRetention("prop-xyz/ns1", new
RetentionPolicies(60, 50));
List<MessageId> messageIds =
publishMessagesOnPersistentTopic(topicName, 10);
@@ -3405,7 +3421,7 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 20000)
public void testNonPersistentTopicTruncate() throws Exception {
- final String topicName =
"non-persistent://prop-xyz/ns1/testTruncateTopic-" +
UUID.randomUUID().toString();
+ final String topicName =
"non-persistent://prop-xyz/ns1/testTruncateTopic3-" +
UUID.randomUUID().toString();
admin.topics().createNonPartitionedTopic(topicName);
assertThrows(() -> {admin.topics().truncate(topicName);});
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 7b66b6a6b51..f9fc717a817 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -373,4 +373,15 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
}
});
}
+
+ @Test
+ public void testHandleNamespaceBeingDeleted() throws Exception {
+ SystemTopicBasedTopicPoliciesService service =
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
+
pulsar.getPulsarResources().getNamespaceResources().setPolicies(NamespaceName.get(NAMESPACE1),
+ old -> {
+ old.deleted = true;
+ return old;
+ });
+ service.deleteTopicPoliciesAsync(TOPIC1).get();
+ }
}