This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new af983049ccd [bugfix] Prevent Automatic Topic Creation during namespace deletion (#17609) af983049ccd is described below commit af983049ccd52da1e795032d9a7ba674c6df4b04 Author: Enrico Olivelli <eolive...@apache.org> AuthorDate: Thu Sep 15 17:38:19 2022 +0200 [bugfix] Prevent Automatic Topic Creation during namespace deletion (#17609) --- .../pulsar/broker/service/BrokerService.java | 44 +++++++--- .../apache/pulsar/broker/admin/AdminApi2Test.java | 34 ++++++++ .../pulsar/broker/admin/TopicAutoCreationTest.java | 99 ++++++++++++++++++++++ 3 files changed, 163 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 17ce9c1ee5a..77cc8f11ff5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2766,6 +2766,9 @@ public class BrokerService implements Closeable { if (pulsar.getNamespaceService() == null) { return FutureUtil.failedFuture(new NamingException("namespace service is not ready")); } + Optional<Policies> policies = + pulsar.getPulsarResources().getNamespaceResources() + .getPoliciesIfCached(topicName.getNamespaceObject()); return pulsar.getNamespaceService().checkTopicExists(topicName) .thenCompose(topicExists -> { return fetchPartitionedTopicMetadataAsync(topicName) @@ -2780,10 +2783,12 @@ public class BrokerService implements Closeable { if (metadata.partitions == 0 && !topicExists && !topicName.isPartitioned() - && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName) - && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) { + && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName, policies) + && pulsar.getBrokerService() + .isDefaultTopicTypePartitioned(topicName, policies)) { - pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName) + pulsar.getBrokerService() + .createDefaultPartitionedTopicAsync(topicName, policies) .thenAccept(md -> future.complete(md)) .exceptionally(ex -> { if (ex.getCause() @@ -2813,8 +2818,9 @@ public class BrokerService implements Closeable { } @SuppressWarnings("deprecation") - private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName) { - final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName); + private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName, + Optional<Policies> policies) { + final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName, policies); final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0"); @@ -3000,11 +3006,23 @@ public class BrokerService implements Closeable { } public boolean isAllowAutoTopicCreation(final TopicName topicName) { + Optional<Policies> policies = + pulsar.getPulsarResources().getNamespaceResources() + .getPoliciesIfCached(topicName.getNamespaceObject()); + return isAllowAutoTopicCreation(topicName, policies); + } + + public boolean isAllowAutoTopicCreation(final TopicName topicName, final Optional<Policies> policies) { + if (policies.isPresent() && policies.get().deleted) { + log.info("Preventing AutoTopicCreation on a namespace that is being deleted {}", + topicName.getNamespaceObject()); + return false; + } //System topic can always be created automatically if (pulsar.getConfiguration().isSystemTopicEnabled() && isSystemTopic(topicName)) { return true; } - AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName); + AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies); if (autoTopicCreationOverride != null) { return autoTopicCreationOverride.isAllowAutoTopicCreation(); } else { @@ -3012,8 +3030,8 @@ public class BrokerService implements Closeable { } } - public boolean isDefaultTopicTypePartitioned(final TopicName topicName) { - AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName); + public boolean isDefaultTopicTypePartitioned(final TopicName topicName, final Optional<Policies> policies) { + AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies); if (autoTopicCreationOverride != null) { return TopicType.PARTITIONED.toString().equals(autoTopicCreationOverride.getTopicType()); } else { @@ -3021,8 +3039,8 @@ public class BrokerService implements Closeable { } } - public int getDefaultNumPartitions(final TopicName topicName) { - AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName); + public int getDefaultNumPartitions(final TopicName topicName, final Optional<Policies> policies) { + AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies); if (autoTopicCreationOverride != null) { return autoTopicCreationOverride.getDefaultNumPartitions(); } else { @@ -3030,10 +3048,8 @@ public class BrokerService implements Closeable { } } - private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) { - Optional<Policies> policies = - pulsar.getPulsarResources().getNamespaceResources() - .getPoliciesIfCached(topicName.getNamespaceObject()); + private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName, + Optional<Policies> policies) { // If namespace policies have the field set, it will override the broker-level setting if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) { return policies.get().autoTopicCreationOverride; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 4171abc00b7..92eac8342f4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -1846,6 +1846,40 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { } } + @Test + public void testForceDeleteNamespaceWithAutomaticTopicCreation() throws Exception { + conf.setForceDeleteNamespaceAllowed(true); + final String namespaceName = "prop-xyz2/ns1"; + TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")); + admin.tenants().createTenant("prop-xyz2", tenantInfo); + admin.namespaces().createNamespace(namespaceName, 1); + admin.namespaces().setAutoTopicCreation(namespaceName, + AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(true) + .topicType("partitioned") + .defaultNumPartitions(20) + .build()); + final String topic = "persistent://" + namespaceName + "/test" + UUID.randomUUID(); + + // start a consumer, that creates the topic + try (Consumer<Double> consumer = pulsarClient.newConsumer(Schema.DOUBLE).topic(topic) + .subscriptionName("test").autoUpdatePartitions(true).subscribe()) { + + // wait for the consumer to settle + Awaitility.await().ignoreExceptions().untilAsserted(() -> + assertNotNull(admin.topics().getSubscriptions(topic).contains("test"))); + + // verify that the partitioned topic is created + assertEquals(20, admin.topics().getPartitionedTopicMetadata(topic).partitions); + + // the consumer will race with the deletion + // the consumer will try to re-create the partitions + admin.namespaces().deleteNamespace(namespaceName, true); + + assertFalse(admin.namespaces().getNamespaces("prop-xyz2").contains("ns1")); + } + } + @Test public void testUpdateClusterWithProxyUrl() throws Exception { ClusterData cluster = ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java index 09335d43302..7bd15992f64 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java @@ -19,18 +19,34 @@ package org.apache.pulsar.broker.admin; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import java.net.InetSocketAddress; import java.util.List; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.LookupService; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.powermock.reflect.Whitebox; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker-admin") +@Slf4j public class TopicAutoCreationTest extends ProducerConsumerBase { @Override @@ -43,6 +59,11 @@ public class TopicAutoCreationTest extends ProducerConsumerBase { super.producerBaseSetup(); } + @Override + protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) { + clientBuilder.operationTimeout(2, TimeUnit.SECONDS); + } + @Override @AfterMethod(alwaysRun = true) protected void cleanup() throws Exception { @@ -85,4 +106,82 @@ public class TopicAutoCreationTest extends ProducerConsumerBase { producer.close(); } + + + @Test + public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() + throws Exception { + final String namespaceName = "my-property/my-ns"; + final String topic = "persistent://" + namespaceName + "/test-partitioned-topi-auto-creation-" + + UUID.randomUUID().toString(); + + pulsar.getPulsarResources().getNamespaceResources() + .setPolicies(NamespaceName.get(namespaceName), old -> { + old.deleted = true; + return old; + }); + + + LookupService original = Whitebox.getInternalState(pulsarClient, "lookup"); + try { + + // we want to skip the "lookup" phase, because it is blocked by the HTTP API + LookupService mockLookup = mock(LookupService.class); + Whitebox.setInternalState(pulsarClient, "lookup", mockLookup); + when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer(i -> { + return CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)); + }); + when(mockLookup.getBroker(any())).thenAnswer(i -> { + InetSocketAddress brokerAddress = + new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get()); + return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress)); + }); + + // Creating a producer and creating a Consumer may trigger automatic topic + // creation, let's try to create a Producer and a Consumer + try (Producer<byte[]> producer = pulsarClient.newProducer() + .sendTimeout(1, TimeUnit.SECONDS) + .topic(topic) + .create();) { + } catch (PulsarClientException.LookupException expected) { + String msg = "Namespace bundle for topic (%s) not served by this instance"; + log.info("Expected error", expected); + assertTrue(expected.getMessage().contains(String.format(msg, topic))); + } + + try (Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("test") + .subscribe();) { + } catch (PulsarClientException.LookupException expected) { + String msg = "Namespace bundle for topic (%s) not served by this instance"; + log.info("Expected error", expected); + assertTrue(expected.getMessage().contains(String.format(msg, topic))); + } + + + // verify that the topic does not exist + pulsar.getPulsarResources().getNamespaceResources() + .setPolicies(NamespaceName.get(namespaceName), old -> { + old.deleted = false; + return old; + }); + + admin.topics().getList(namespaceName).isEmpty(); + + // create now the topic using auto creation + Whitebox.setInternalState(pulsarClient, "lookup", original); + + try (Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("test") + .subscribe();) { + } + + admin.topics().getList(namespaceName).contains(topic); + } finally { + Whitebox.setInternalState(pulsarClient, "lookup", original); + } + + } }