This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3e44d1e6e2b [improve] PIP-241: add TopicEventListener / topic events
for the BrokerService (#19153)
3e44d1e6e2b is described below
commit 3e44d1e6e2ba4599d547c83cf7cb25350f0cc560
Author: Andrey Yegorov <[email protected]>
AuthorDate: Thu Feb 2 10:04:52 2023 -0800
[improve] PIP-241: add TopicEventListener / topic events for the
BrokerService (#19153)
---
.../pulsar/broker/service/BrokerService.java | 73 ++++-
.../broker/service/TopicEventsDispatcher.java | 137 +++++++++
.../pulsar/broker/service/TopicEventsListener.java | 62 ++++
.../pulsar/broker/TopicEventsListenerTest.java | 311 +++++++++++++++++++++
.../pulsar/broker/service/BrokerTestBase.java | 2 +-
5 files changed, 579 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index f7020963fb7..27a1518cb81 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -110,6 +110,8 @@ import
org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
+import org.apache.pulsar.broker.service.TopicEventsListener.EventStage;
+import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -281,6 +283,8 @@ public class BrokerService implements Closeable {
private Set<BrokerEntryMetadataInterceptor>
brokerEntryMetadataInterceptors;
private Set<ManagedLedgerPayloadProcessor> brokerEntryPayloadProcessors;
+ private final TopicEventsDispatcher topicEventsDispatcher = new
TopicEventsDispatcher();
+
public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup)
throws Exception {
this.pulsar = pulsar;
this.preciseTopicPublishRateLimitingEnable =
@@ -398,6 +402,16 @@ public class BrokerService implements Closeable {
this.bundlesQuotas = new BundlesQuotas(pulsar.getLocalMetadataStore());
}
+ public void addTopicEventListener(TopicEventsListener... listeners) {
+ topicEventsDispatcher.addTopicEventListener(listeners);
+ getTopics().keys().forEach(topic ->
+ TopicEventsDispatcher.notify(listeners, topic,
TopicEvent.LOAD, EventStage.SUCCESS, null));
+ }
+
+ public void removeTopicEventListener(TopicEventsListener... listeners) {
+ topicEventsDispatcher.removeTopicEventListener(listeners);
+ }
+
// This call is used for starting additional protocol handlers
public void startProtocolHandlers(
Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>>
protocolHandlers) {
@@ -1024,21 +1038,41 @@ public class BrokerService implements Closeable {
return loadOrCreatePersistentTopic(tpName,
createIfMissing, properties);
});
} else {
- return topics.computeIfAbsent(topicName.toString(), (name) -> {
+ return topics.computeIfAbsent(topicName.toString(), (name) -> {
+ topicEventsDispatcher.notify(topicName.toString(),
TopicEvent.LOAD, EventStage.BEFORE);
if (topicName.isPartitioned()) {
final TopicName partitionedTopicName =
TopicName.get(topicName.getPartitionedTopicName());
return
this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata)
-> {
if (topicName.getPartitionIndex() <
metadata.partitions) {
- return createNonPersistentTopic(name);
+ topicEventsDispatcher
+ .notify(topicName.toString(),
TopicEvent.CREATE, EventStage.BEFORE);
+
+ CompletableFuture<Optional<Topic>> res =
createNonPersistentTopic(name);
+
+ CompletableFuture<Optional<Topic>> eventFuture
= topicEventsDispatcher
+ .notifyOnCompletion(res,
topicName.toString(), TopicEvent.CREATE);
+ topicEventsDispatcher
+ .notifyOnCompletion(eventFuture,
topicName.toString(), TopicEvent.LOAD);
+ return res;
}
+ topicEventsDispatcher.notify(topicName.toString(),
TopicEvent.LOAD, EventStage.FAILURE);
return
CompletableFuture.completedFuture(Optional.empty());
});
} else if (createIfMissing) {
- return createNonPersistentTopic(name);
+ topicEventsDispatcher.notify(topicName.toString(),
TopicEvent.CREATE, EventStage.BEFORE);
+
+ CompletableFuture<Optional<Topic>> res =
createNonPersistentTopic(name);
+
+ CompletableFuture<Optional<Topic>> eventFuture =
topicEventsDispatcher
+ .notifyOnCompletion(res, topicName.toString(),
TopicEvent.CREATE);
+ topicEventsDispatcher
+ .notifyOnCompletion(eventFuture,
topicName.toString(), TopicEvent.LOAD);
+ return res;
} else {
+ topicEventsDispatcher.notify(topicName.toString(),
TopicEvent.LOAD, EventStage.FAILURE);
return
CompletableFuture.completedFuture(Optional.empty());
}
- });
+ });
}
} catch (IllegalArgumentException e) {
log.warn("[{}] Illegalargument exception when loading topic",
topicName, e);
@@ -1056,6 +1090,13 @@ public class BrokerService implements Closeable {
}
public CompletableFuture<Void> deleteTopic(String topic, boolean
forceDelete) {
+ topicEventsDispatcher.notify(topic, TopicEvent.DELETE,
EventStage.BEFORE);
+ CompletableFuture<Void> result = deleteTopicInternal(topic,
forceDelete);
+ topicEventsDispatcher.notifyOnCompletion(result, topic,
TopicEvent.DELETE);
+ return result;
+ }
+
+ private CompletableFuture<Void> deleteTopicInternal(String topic, boolean
forceDelete) {
TopicName topicName = TopicName.get(topic);
Optional<Topic> optTopic = getTopicReference(topic);
@@ -1402,7 +1443,7 @@ public class BrokerService implements Closeable {
log.debug("Broker is unable to load persistent topic {}",
topic);
}
topicFuture.completeExceptionally(new NotAllowedException(
- "Broker is not unable to load persistent topic"));
+ "Broker is unable to load persistent topic"));
return topicFuture;
}
@@ -1542,6 +1583,24 @@ public class BrokerService implements Closeable {
managedLedgerConfig.setShadowSourceName(TopicName.get(shadowSource).getPersistenceNamingEncoding());
}
+ topicEventsDispatcher.notify(topic, TopicEvent.LOAD,
EventStage.BEFORE);
+ // load can fail with topicFuture completed non-exceptionally
+ // work around this
+ final CompletableFuture<Void> loadFuture = new
CompletableFuture<>();
+ topicFuture.whenComplete((res, ex) -> {
+ if (ex == null) {
+ loadFuture.complete(null);
+ } else {
+ loadFuture.completeExceptionally(ex);
+ }
+ });
+
+ if (createIfMissing) {
+ topicEventsDispatcher.notify(topic, TopicEvent.CREATE,
EventStage.BEFORE);
+ topicEventsDispatcher.notifyOnCompletion(topicFuture, topic,
TopicEvent.CREATE);
+ }
+ topicEventsDispatcher.notifyOnCompletion(loadFuture, topic,
TopicEvent.LOAD);
+
// Once we have the configuration, we can proceed with the async
open operation
managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(),
managedLedgerConfig,
new OpenLedgerCallback() {
@@ -1603,6 +1662,7 @@ public class BrokerService implements Closeable {
public void openLedgerFailed(ManagedLedgerException
exception, Object ctx) {
if (!createIfMissing && exception instanceof
ManagedLedgerNotFoundException) {
// We were just trying to load a topic and the
topic doesn't exist
+ loadFuture.completeExceptionally(exception);
topicFuture.complete(Optional.empty());
} else {
log.warn("Failed to create topic {}", topic,
exception);
@@ -2135,6 +2195,8 @@ public class BrokerService implements Closeable {
String bundleName = namespaceBundle.toString();
String namespaceName =
TopicName.get(topic).getNamespaceObject().toString();
+ topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD,
EventStage.BEFORE);
+
synchronized (multiLayerTopicsMap) {
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String,
Topic>> namespaceMap = multiLayerTopicsMap
.get(namespaceName);
@@ -2169,6 +2231,7 @@ public class BrokerService implements Closeable {
if (compactor != null) {
compactor.getStats().removeTopic(topic);
}
+ topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD,
EventStage.SUCCESS);
}
public int getNumberOfNamespaceBundles() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsDispatcher.java
new file mode 100644
index 00000000000..a706e00db90
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsDispatcher.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Utility class to dispatch topic events.
+ */
+@Slf4j
+public class TopicEventsDispatcher {
+ private final List<TopicEventsListener> topicEventListeners = new
CopyOnWriteArrayList<>();
+
+ /**
+ * Adds listeners, ignores null listeners.
+ * @param listeners
+ */
+ public void addTopicEventListener(TopicEventsListener... listeners) {
+ Objects.requireNonNull(listeners);
+ Arrays.stream(listeners)
+ .filter(x -> x != null)
+ .forEach(topicEventListeners::add);
+ }
+
+ /**
+ * Removes listeners.
+ * @param listeners
+ */
+ public void removeTopicEventListener(TopicEventsListener... listeners) {
+ Objects.requireNonNull(listeners);
+ Arrays.stream(listeners)
+ .filter(x -> x != null)
+ .forEach(topicEventListeners::remove);
+ }
+
+ /**
+ * Dispatches notification to all currently added listeners.
+ * @param topic
+ * @param event
+ * @param stage
+ */
+ public void notify(String topic,
+ TopicEventsListener.TopicEvent event,
+ TopicEventsListener.EventStage stage) {
+ notify(topic, event, stage, null);
+ }
+
+ /**
+ * Dispatches notification to all currently added listeners.
+ * @param topic
+ * @param event
+ * @param stage
+ * @param t
+ */
+ public void notify(String topic,
+ TopicEventsListener.TopicEvent event,
+ TopicEventsListener.EventStage stage,
+ Throwable t) {
+ topicEventListeners
+ .forEach(listener -> notify(listener, topic, event, stage, t));
+ }
+
+ /**
+ * Dispatches SUCCESS/FAILURE notification to all currently added
listeners on completion of the future.
+ * @param future
+ * @param topic
+ * @param event
+ * @param <T>
+ * @return future of a new completion stage
+ */
+ public <T> CompletableFuture<T> notifyOnCompletion(CompletableFuture<T>
future,
+ String topic,
+
TopicEventsListener.TopicEvent event) {
+ return future.whenComplete((r, ex) -> notify(topic,
+ event,
+ ex == null ? TopicEventsListener.EventStage.SUCCESS :
TopicEventsListener.EventStage.FAILURE,
+ ex));
+ }
+
+ /**
+ * Dispatches notification to specified listeners.
+ * @param listeners
+ * @param topic
+ * @param event
+ * @param stage
+ * @param t
+ */
+ public static void notify(TopicEventsListener[] listeners,
+ String topic,
+ TopicEventsListener.TopicEvent event,
+ TopicEventsListener.EventStage stage,
+ Throwable t) {
+ Objects.requireNonNull(listeners);
+ for (TopicEventsListener listener: listeners) {
+ notify(listener, topic, event, stage, t);
+ }
+ }
+
+ private static void notify(TopicEventsListener listener,
+ String topic,
+ TopicEventsListener.TopicEvent event,
+ TopicEventsListener.EventStage stage,
+ Throwable t) {
+ if (listener == null) {
+ return;
+ }
+
+ try {
+ listener.handleEvent(topic, event, stage, t);
+ } catch (Throwable ex) {
+ log.error("TopicEventsListener {} exception while handling {}_{}
for topic {}",
+ listener, event, stage, topic, ex);
+ }
+ }
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsListener.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsListener.java
new file mode 100644
index 00000000000..8068067206c
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsListener.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * Listener for the Topic events.
+ */
[email protected]
[email protected]
+public interface TopicEventsListener {
+
+ /**
+ * Types of events currently supported.
+ * create/load/unload/delete
+ */
+ enum TopicEvent {
+ // create events included into load events
+ CREATE,
+ LOAD,
+ UNLOAD,
+ DELETE,
+ }
+
+ /**
+ * Stages of events currently supported.
+ * before starting the event/successful completion/failed completion
+ */
+ enum EventStage {
+ BEFORE,
+ SUCCESS,
+ FAILURE
+ }
+
+ /**
+ * Handle topic event.
+ * Choice of the thread / maintenance of the thread pool is up to the
event handlers.
+ * @param topicName - name of the topic
+ * @param event - TopicEvent
+ * @param stage - EventStage
+ * @param t - exception in case of FAILURE, if present/known
+ */
+ void handleEvent(String topicName, TopicEvent event, EventStage stage,
Throwable t);
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
new file mode 100644
index 00000000000..e6459bbf74c
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import com.google.common.collect.Sets;
+
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+@Slf4j
+public class TopicEventsListenerTest extends BrokerTestBase {
+
+ final static Queue<String> events = new ConcurrentLinkedQueue<>();
+ volatile String topicNameToWatch;
+ String namespace;
+
+ @DataProvider(name = "topicType")
+ public static Object[][] topicType() {
+ return new Object[][] {
+ {"persistent", "partitioned", true},
+ {"persistent", "non-partitioned", true},
+ {"non-persistent", "partitioned", true},
+ {"non-persistent", "non-partitioned", true},
+ {"persistent", "partitioned", false},
+ {"persistent", "non-partitioned", false},
+ {"non-persistent", "partitioned", false},
+ {"non-persistent", "non-partitioned", false}
+ };
+ }
+
+ @DataProvider(name = "topicTypeNoDelete")
+ public static Object[][] topicTypeNoDelete() {
+ return new Object[][] {
+ {"persistent", "partitioned"},
+ {"persistent", "non-partitioned"},
+ {"non-persistent", "partitioned"},
+ {"non-persistent", "non-partitioned"}
+ };
+ }
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.baseSetup();
+ pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
+
+ pulsar.getBrokerService().addTopicEventListener((topic, event, stage,
t) -> {
+ log.info("got event {}__{} for topic {}", event, stage, topic);
+ if (topic.equals(topicNameToWatch)) {
+ if (log.isDebugEnabled()) {
+ log.debug("got event {}__{} for topic {} with detailed
stack",
+ event, stage, topic, new Exception("tracing event
source"));
+ }
+ events.add(event.toString() + "__" + stage.toString());
+ }
+ });
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @BeforeMethod
+ protected void setupTest() throws Exception {
+ namespace = "prop/" + UUID.randomUUID();
+ admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+
assertTrue(admin.namespaces().getNamespaces("prop").contains(namespace));
+ admin.namespaces().setRetention(namespace, new RetentionPolicies(3,
10));
+ try (PulsarAdmin admin2 = createPulsarAdmin()) {
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin2.namespaces().getRetention(namespace),
new RetentionPolicies(3, 10)));
+ }
+
+ events.clear();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ protected void cleanupTest() throws Exception {
+ deleteNamespaceWithRetry(namespace, true);
+ }
+
+ @Test(dataProvider = "topicType")
+ public void testEvents(String topicTypePersistence, String
topicTypePartitioned,
+ boolean forceDelete) throws Exception {
+ String topicName = topicTypePersistence + "://" + namespace + "/" +
"topic-" + UUID.randomUUID();
+
+ createTopicAndVerifyEvents(topicTypePartitioned, topicName);
+
+ events.clear();
+ if (topicTypePartitioned.equals("partitioned")) {
+ admin.topics().deletePartitionedTopic(topicName, forceDelete);
+ } else {
+ admin.topics().delete(topicName, forceDelete);
+ }
+
+ Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+ Assert.assertEquals(events.toArray(), new String[]{
+ "DELETE__BEFORE",
+ "UNLOAD__BEFORE",
+ "UNLOAD__SUCCESS",
+ "DELETE__SUCCESS"
+ })
+ );
+ }
+
+ @Test(dataProvider = "topicType")
+ public void testEventsWithUnload(String topicTypePersistence, String
topicTypePartitioned,
+ boolean forceDelete) throws Exception {
+ String topicName = topicTypePersistence + "://" + namespace + "/" +
"topic-" + UUID.randomUUID();
+
+ createTopicAndVerifyEvents(topicTypePartitioned, topicName);
+
+ events.clear();
+ admin.topics().unload(topicName);
+
+ Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+ Assert.assertEquals(events.toArray(), new String[]{
+ "UNLOAD__BEFORE",
+ "UNLOAD__SUCCESS"
+ })
+ );
+
+ events.clear();
+ if (topicTypePartitioned.equals("partitioned")) {
+ admin.topics().deletePartitionedTopic(topicName, forceDelete);
+ } else {
+ admin.topics().delete(topicName, forceDelete);
+ }
+
+ Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+ Assert.assertEquals(events.toArray(), new String[]{
+ "DELETE__BEFORE",
+ "DELETE__SUCCESS"
+ })
+ );
+ }
+
+ @Test(dataProvider = "topicType")
+ public void testEventsActiveSub(String topicTypePersistence, String
topicTypePartitioned,
+ boolean forceDelete) throws Exception {
+ String topicName = topicTypePersistence + "://" + namespace + "/" +
"topic-" + UUID.randomUUID();
+
+ createTopicAndVerifyEvents(topicTypePartitioned, topicName);
+
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ for (int i = 0; i < 10; i++) {
+ producer.send("hello".getBytes());
+ }
+ consumer.receive();
+
+ events.clear();
+ try {
+ if (topicTypePartitioned.equals("partitioned")) {
+ admin.topics().deletePartitionedTopic(topicName, forceDelete);
+ } else {
+ admin.topics().delete(topicName, forceDelete);
+ }
+ } catch (PulsarAdminException e) {
+ if (forceDelete) {
+ throw e;
+ }
+ assertTrue(e.getMessage().contains("Topic has active
producers/subscriptions")
+ || e.getMessage().contains("connected
producers/consumers"));
+ }
+
+ final String[] expectedEvents;
+
+ if (forceDelete) {
+ expectedEvents = new String[]{
+ "DELETE__BEFORE",
+ "UNLOAD__BEFORE",
+ "UNLOAD__SUCCESS",
+ "DELETE__SUCCESS",
+ };
+ } else {
+ expectedEvents = new String[]{
+ "DELETE__BEFORE",
+ "DELETE__FAILURE"
+ };
+ }
+
+ Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ // only care about first 4 events max, the rest will be from
client recreating deleted topic
+ String[] eventsToArray = (events.size() <= 4)
+ ? events.toArray(new String[0])
+ : ArrayUtils.subarray(events.toArray(new String[0]), 0, 4);
+ Assert.assertEquals(eventsToArray, expectedEvents);
+ });
+
+ consumer.close();
+ producer.close();
+ }
+
+ @Test(dataProvider = "topicTypeNoDelete")
+ public void testTopicAutoGC(String topicTypePersistence, String
topicTypePartitioned) throws Exception {
+ String topicName = topicTypePersistence + "://" + namespace + "/" +
"topic-" + UUID.randomUUID();
+
+ createTopicAndVerifyEvents(topicTypePartitioned, topicName);
+
+ admin.namespaces().setInactiveTopicPolicies(namespace,
+ new
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1,
true));
+
+ // Remove retention
+ admin.namespaces().setRetention(namespace, new RetentionPolicies());
+ try (PulsarAdmin admin2 = createPulsarAdmin()) {
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin2.namespaces().getRetention(namespace),
new RetentionPolicies()));
+ }
+
+ events.clear();
+
+ runGC();
+
+ Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+ Assert.assertEquals(events.toArray(), new String[]{
+ "UNLOAD__BEFORE",
+ "UNLOAD__SUCCESS",
+ })
+ );
+ }
+
+ private void createTopicAndVerifyEvents(String topicTypePartitioned,
String topicName) throws Exception {
+ final String[] expectedEvents;
+ if (topicTypePartitioned.equals("partitioned")) {
+ topicNameToWatch = topicName + "-partition-1";
+ admin.topics().createPartitionedTopic(topicName, 2);
+ triggerPartitionsCreation(topicName);
+
+ expectedEvents = new String[]{
+ "LOAD__BEFORE",
+ "CREATE__BEFORE",
+ "CREATE__SUCCESS",
+ "LOAD__SUCCESS"
+ };
+
+ } else {
+ topicNameToWatch = topicName;
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ expectedEvents = new String[]{
+ "LOAD__BEFORE",
+ "LOAD__FAILURE",
+ "LOAD__BEFORE",
+ "CREATE__BEFORE",
+ "CREATE__SUCCESS",
+ "LOAD__SUCCESS"
+ };
+
+ }
+
+ Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+ Assert.assertEquals(events.toArray(), expectedEvents));
+ }
+
+ private PulsarAdmin createPulsarAdmin() throws PulsarClientException {
+ return PulsarAdmin.builder()
+ .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() :
brokerUrlTls.toString())
+ .build();
+ }
+
+ private void triggerPartitionsCreation(String topicName) throws Exception {
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .create();
+ producer.close();
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
index 5fd4edd2a30..63f778a44f1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
@@ -79,7 +79,7 @@ public abstract class BrokerTestBase extends
MockedPulsarServiceBaseTest {
}
}
- void runGC() {
+ protected void runGC() {
try {
pulsar.getBrokerService().forEachTopic(topic -> {
if (topic instanceof AbstractTopic) {