This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9529850bd48 [fix] Combination of autocreate + forced delete of 
partitioned topic with active consumer leaves topic metadata inconsistent. 
(#17308)
9529850bd48 is described below

commit 9529850bd48557dcba124a157a69b75b8f41da3b
Author: Andrey Yegorov <8622884+dl...@users.noreply.github.com>
AuthorDate: Thu Sep 1 00:57:30 2022 -0700

    [fix] Combination of autocreate + forced delete of partitioned topic with 
active consumer leaves topic metadata inconsistent. (#17308)
---
 .../broker/service/persistent/PersistentTopic.java |   7 +
 .../broker/admin/AdminApiMultiBrokersTest.java     |  98 +++++++++++
 .../apache/pulsar/broker/admin/NamespacesTest.java |   1 +
 .../broker/service/ExclusiveProducerTest.java      |   8 +-
 .../pulsar/client/impl/TopicsConsumerImplTest.java |  28 ----
 .../integration/topics/TestTopicDeletion.java      | 183 +++++++++++++++++++++
 6 files changed, 296 insertions(+), 29 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c8cd487c5ee..4d63ddafd01 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1139,6 +1139,13 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                 .map(PersistentSubscription::getName).toList();
                 return FutureUtil.failedFuture(
                         new TopicBusyException("Topic has subscriptions did 
not catch up: " + backlogSubs));
+            } else if (TopicName.get(topic).isPartitioned()
+                    && (getProducers().size() > 0 || getNumberOfConsumers() > 
0)
+                    && getBrokerService().isAllowAutoTopicCreation(topic)) {
+                // to avoid inconsistent metadata as a result
+                return FutureUtil.failedFuture(
+                        new TopicBusyException("Partitioned topic has active 
consumers or producers and "
+                                + "auto-creation of topic is allowed"));
             }
 
             fenceTopicToCloseOrDelete(); // Avoid clients reconnections while 
deleting
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java
index 2cbff955ecf..5a0bde6f913 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
 
@@ -26,6 +28,8 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pulsar.broker.MultiBrokerBaseTest;
@@ -33,9 +37,16 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.loadbalance.LeaderBroker;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.internal.TopicsImpl;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -54,6 +65,7 @@ public class AdminApiMultiBrokersTest extends 
MultiBrokerBaseTest {
     @Override
     protected void doInitConf() throws Exception {
         super.doInitConf();
+        this.conf.setManagedLedgerMaxEntriesPerLedger(10);
     }
 
     @Override
@@ -122,4 +134,90 @@ public class AdminApiMultiBrokersTest extends 
MultiBrokerBaseTest {
         Assert.assertEquals(lookupResultSet.size(), 1);
     }
 
+    @Test
+    public void testForceDeletePartitionedTopicWithSub() throws Exception {
+        final int numPartitions = 10;
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", 
"role2"), Set.of("test"));
+        admin.tenants().createTenant("tenant-xyz", tenantInfo);
+        admin.namespaces().createNamespace("tenant-xyz/ns-abc", 
Set.of("test"));
+
+        admin.namespaces().setAutoTopicCreation("tenant-xyz/ns-abc",
+                AutoTopicCreationOverride.builder()
+                        .allowAutoTopicCreation(true)
+                        .topicType("partitioned")
+                        .defaultNumPartitions(5)
+                        .build());
+
+        RetentionPolicies retention = new RetentionPolicies(10, 10);
+        admin.namespaces().setRetention("tenant-xyz/ns-abc", retention);
+        final String topic = "persistent://tenant-xyz/ns-abc/topic-"
+                + RandomStringUtils.randomAlphabetic(5)
+                + "-testDeletePartitionedTopicWithSub";
+        final String subscriptionName = "sub";
+        ((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, 
numPartitions, true, null).get();
+
+        log.info("Creating producer and consumer");
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subscriptionName)
+                .subscribe();
+
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topic).create();
+
+        log.info("producing messages");
+        for (int i = 0; i < numPartitions * 100; ++i) {
+            producer.newMessage()
+                    .key("" + i)
+                    .value("value-" + i)
+                    .send();
+        }
+        producer.flush();
+        producer.close();
+
+        log.info("consuming some messages");
+        for (int i = 0; i < numPartitions * 5; i++) {
+            Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
+        }
+
+        log.info("trying to delete the topic");
+        try {
+            admin.topics().deletePartitionedTopic(topic, true);
+            fail("expected PulsarAdminException.NotFoundException");
+        } catch (PulsarAdminException e) {
+            assertTrue(e.getMessage().contains("Partitioned topic has active 
consumers or producers"));
+        }
+
+        // check that metadata is still consistent
+        assertEquals(numPartitions, admin.topics().getList("tenant-xyz/ns-abc")
+                .stream().filter(t -> t.contains(topic)).count());
+        assertEquals(numPartitions,
+                pulsar.getPulsarResources().getTopicResources()
+                        
.getExistingPartitions(TopicName.getPartitionedTopicName(topic))
+                        .get()
+                        .stream().filter(t -> t.contains(topic)).count());
+        assertTrue(admin.topics()
+                .getPartitionedTopicList("tenant-xyz/ns-abc")
+                .contains(topic));
+
+        log.info("closing producer and consumer");
+        producer.close();
+        consumer.close();
+
+        log.info("trying to delete the topic again");
+        admin.topics().deletePartitionedTopic(topic, true);
+
+        assertEquals(0, admin.topics().getList("tenant-xyz/ns-abc")
+                .stream().filter(t -> t.contains(topic)).count());
+        assertEquals(0,
+                pulsar.getPulsarResources().getTopicResources()
+                        
.getExistingPartitions(TopicName.getPartitionedTopicName(topic))
+                        .get()
+                        .stream().filter(t -> t.contains(topic)).count());
+        assertFalse(admin.topics()
+                .getPartitionedTopicList("tenant-xyz/ns-abc")
+                .contains(topic));
+
+        log.info("trying to create the topic again");
+        ((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, 
numPartitions, true, null).get();
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index c23407bb447..8caf3a47dd3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -1262,6 +1262,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         pulsarClient.updateServiceUrl(lookupUrl.toString());
         Awaitility.await().untilAsserted(() -> 
assertTrue(consumer.isConnected()));
         pulsar.getConfiguration().setAuthorizationEnabled(true);
+        consumer.close();
         admin.topics().deletePartitionedTopic(topicName, true);
         admin.namespaces().deleteNamespace(namespace);
         admin.tenants().deleteTenant("my-tenants");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 604abd8d709..338ffc01807 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import io.netty.util.HashedWheelTimer;
 import lombok.Cleanup;
 
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -331,7 +332,12 @@ public class ExclusiveProducerTest extends BrokerTestBase {
         p1.send("msg-1");
 
         if (partitioned) {
-            admin.topics().deletePartitionedTopic(topic, true);
+            try {
+                admin.topics().deletePartitionedTopic(topic, true);
+                fail("expected error because partitioned topic has active 
producer");
+            } catch (PulsarAdminException.ServerSideErrorException e) {
+                // expected
+            }
         } else {
             admin.topics().delete(topic, true);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 8d068d65114..35679711fde 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -1171,34 +1171,6 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         }
     }
 
-    @Test(timeOut = testTimeout)
-    public void testAutoDiscoverMultiTopicsPartitions() throws Exception {
-        final String topicName = "persistent://public/default/issue-9585";
-        admin.topics().createPartitionedTopic(topicName, 3);
-        PatternMultiTopicsConsumerImpl<String> consumer = 
(PatternMultiTopicsConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
-                .topicsPattern(topicName)
-                .subscriptionName("sub-issue-9585")
-                .subscribe();
-
-        Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 3);
-        Assert.assertEquals(consumer.getConsumers().size(), 3);
-
-        admin.topics().deletePartitionedTopic(topicName, true);
-        
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
-        Awaitility.await().untilAsserted(() -> {
-            Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 0);
-            Assert.assertEquals(consumer.getConsumers().size(), 0);
-        });
-
-        admin.topics().createPartitionedTopic(topicName, 7);
-        
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
-        Awaitility.await().untilAsserted(() -> {
-            Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 7);
-            Assert.assertEquals(consumer.getConsumers().size(), 7);
-        });
-    }
-
-
     @Test(timeOut = testTimeout)
     public void testPartitionsUpdatesForMultipleTopics() throws Exception {
         final String topicName0 = 
"persistent://public/default/testPartitionsUpdatesForMultipleTopics-0";
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java
new file mode 100644
index 00000000000..0adb414e8f4
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.topics;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.fail;
+
+/**
+ * Test cases for compaction.
+ */
+@Slf4j
+public class TestTopicDeletion extends PulsarTestSuite {
+
+    final private boolean unload = false;
+    final private int numBrokers = 2;
+
+    public void setupCluster() throws Exception {
+        brokerEnvs.put("managedLedgerMaxEntriesPerLedger", "10");
+        brokerEnvs.put("brokerDeleteInactivePartitionedTopicMetadataEnabled", 
"false");
+        brokerEnvs.put("brokerDeleteInactiveTopicsEnabled", "false");
+        this.setupCluster("");
+    }
+
+    protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
+            String clusterName,
+            PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+        specBuilder.numBrokers(numBrokers);
+        specBuilder.enableContainerLog(true);
+        return specBuilder;
+    }
+
+    @Test(dataProvider = "ServiceUrls", timeOut=300_000)
+    public void testPartitionedTopicForceDeletion(Supplier<String> serviceUrl) 
throws Exception {
+
+        log.info("Creating tenant and namespace");
+
+        final String tenant = "test-partitioned-topic-" + randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + 
"/partitioned-topic";
+        final int numPartitions = numBrokers * 3;
+        final int numKeys = numPartitions * 50;
+        final String subscriptionName = "sub1";
+
+        this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin");
+
+        this.createNamespace(namespace);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "set-clusters", "--clusters", pulsarCluster.getClusterName(), 
namespace);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "set-retention", "--size", "100M", "--time", "100m", 
namespace);
+
+        this.createPartitionedTopic(topic, numPartitions);
+
+        try (PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl.get()).build()) {
+
+            log.info("Creating consumer");
+            Consumer<byte[]> consumer = client.newConsumer()
+                    .topic(topic)
+                    .subscriptionName(subscriptionName)
+                    .subscribe();
+
+            log.info("Producing messages");
+            try(Producer<byte[]> producer = client.newProducer()
+                .topic(topic)
+                .create()
+            ) {
+                for (int i = 0; i < numKeys; i++) {
+                    producer.newMessage()
+                        .key("" + i)
+                        .value(("value-" + i).getBytes(UTF_8))
+                        .sendAsync();
+                }
+                producer.flush();
+                log.info("Successfully wrote {} values", numKeys);
+            }
+
+            log.info("Consuming half of the messages");
+            for (int i = 0; i < numKeys / 2; i++) {
+                Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
+                log.info("Read value {}", m.getKey());
+            }
+
+            if (unload) {
+                log.info("Unloading topic");
+                pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                        "unload", topic);
+            }
+
+            ContainerExecResult res;
+            log.info("Deleting the topic");
+            try {
+                res = pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                        "delete-partitioned-topic", "--force", topic);
+                assertNotEquals(0, res.getExitCode());
+            } catch (ContainerExecException e) {
+                log.info("Second delete failed with ContainerExecException, 
could be ok", e);
+                if (!e.getMessage().contains("with error code 1")) {
+                    fail("Expected different error code");
+                }
+            }
+
+            log.info("Close the consumer and delete the topic again");
+            consumer.close();
+
+            res = pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                    "delete-partitioned-topic", "--force", topic);
+            assertNotEquals(0, res.getExitCode());
+
+            Thread.sleep(5000);
+            // should succeed
+            log.info("Creating the topic again");
+            this.createPartitionedTopic(topic, numBrokers * 2);
+        }
+    }
+
+
+    private ContainerExecResult createTenantName(final String tenantName,
+                                                 final String 
allowedClusterName,
+                                                 final String adminRoleName) 
throws Exception {
+        ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
+            "tenants", "create", "--allowed-clusters", allowedClusterName,
+            "--admin-roles", adminRoleName, tenantName);
+        assertEquals(0, result.getExitCode());
+        return result;
+    }
+
+    private ContainerExecResult createNamespace(final String Ns) throws 
Exception {
+        ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
+                "namespaces",
+                "create",
+                "--clusters",
+                pulsarCluster.getClusterName(), Ns);
+        assertEquals(0, result.getExitCode());
+        return result;
+    }
+
+    private ContainerExecResult createPartitionedTopic(final String 
partitionedTopicName, int numPartitions)
+            throws Exception {
+        ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
+            "topics",
+            "create-partitioned-topic",
+            "--partitions", "" + numPartitions,
+            partitionedTopicName);
+        assertEquals(0, result.getExitCode());
+        return result;
+    }
+
+
+}

Reply via email to