This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 324ce900c4cf6614c0f0e84029d0dc13aef2c9dc Author: Shen Liu <[email protected]> AuthorDate: Thu Apr 21 11:54:34 2022 +0800 Fix topic closed normally but still call `closeFencedTopicForcefully`. (#15196) (#15202) Co-authored-by: druidliu <[email protected]> Fixes #15196. ### Motivation If broker having conf `topicFencingTimeoutSeconds`>0, a topic is trigged closed and closed normally, `closeFencedTopicForcefully` should not be called. ### Modifications Cancel fenced topic monitoring task if topic close normally, which cancel running `closeFencedTopicForcefully`. ### Verifying this change - [ ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: - Add `org.apache.pulsar.broker.service.PersistentTopicTest#testTopicCloseFencingTimeout` ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API: (yes / no) - The schema: (yes / no / don't know) - The default values of configurations: (yes / no) - The wire protocol: (yes / no) - The rest endpoints: (yes / no) - The admin cli options: (yes / no) - Anything that affects deployment: (yes / no / don't know) ### Documentation Check the box below or label this PR directly. Need to update docs? - [ ] `doc-required` - [x] `no-need-doc` - [ ] `doc` - [ ] `doc-added` (cherry picked from commit e4a8de1eb9605e36060f0740a0097203a21e34ef) --- .../broker/service/persistent/PersistentTopic.java | 13 +++++++++---- .../pulsar/broker/service/PersistentTopicTest.java | 22 ++++++++++++++++++++++ 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c93eda3aada..6746320e642 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1283,6 +1283,7 @@ public class PersistentTopic extends AbstractTopic unregisterTopicPolicyListener(); log.info("[{}] Topic closed", topic); + cancelFencedTopicMonitoringTask(); closeFuture.complete(null); }) .exceptionally(ex -> { @@ -2946,6 +2947,13 @@ public class PersistentTopic extends AbstractTopic return true; } + private synchronized void cancelFencedTopicMonitoringTask() { + ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask; + if (monitoringTask != null && !monitoringTask.isDone()) { + monitoringTask.cancel(false); + } + } + private synchronized void fence() { isFenced = true; ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask; @@ -2960,10 +2968,7 @@ public class PersistentTopic extends AbstractTopic private synchronized void unfence() { isFenced = false; - ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask; - if (monitoringTask != null && !monitoringTask.isDone()) { - monitoringTask.cancel(false); - } + cancelFencedTopicMonitoringTask(); } private void closeFencedTopicForcefully() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 3f530a53804..4599d0dfcf3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -2175,6 +2175,28 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { assertTrue((boolean) isClosingOrDeletingField.get(topic)); } + @Test + public void testTopicCloseFencingTimeout() throws Exception { + pulsar.getConfiguration().setTopicFencingTimeoutSeconds(10); + Method fence = PersistentTopic.class.getDeclaredMethod("fence"); + fence.setAccessible(true); + Field fencedTopicMonitoringTaskField = PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask"); + fencedTopicMonitoringTaskField.setAccessible(true); + + // create topic + PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get(); + + // fence topic to init fencedTopicMonitoringTask + fence.invoke(topic); + + // close topic + topic.close().get(); + assertFalse(brokerService.getTopicReference(successTopicName).isPresent()); + ScheduledFuture<?> fencedTopicMonitoringTask = (ScheduledFuture<?>) fencedTopicMonitoringTaskField.get(topic); + assertTrue(fencedTopicMonitoringTask.isDone()); + assertTrue(fencedTopicMonitoringTask.isCancelled()); + } + @Test public void testGetDurableSubscription() throws Exception { ManagedLedger mockLedger = mock(ManagedLedger.class);
