This is an automated email from the ASF dual-hosted git repository. guangning pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 968dad2b68d289905036873c7f8c86611584e198 Author: Rajan Dhabalia <[email protected]> AuthorDate: Sun Jan 5 19:57:51 2020 -0800 [pulsar-broker] close managed-ledgers before giving up bundle ownership to avoid bad zk-version (#5599) ### Motivation We have seen multiple below occurrence where unloading topic doesn't complete and gets stuck. and broker gives up ownership after a timeout and closing ml-factory closes unclosed managed-ledger which corrupts metadata zk-version and topic owned by new broker keeps failing with exception: `ManagedLedgerException$BadVersionException` right now, while unloading bundle: broker removes ownership of bundle after timeout even if topic's managed-ledger is not closed successfully and `ManagedLedgerFactoryImpl` closes unclosed ml-ledger on broker shutdown which causes bad zk-version in to the new broker and because of that cursors are not able to update cursor-metadata into zk. ``` 01:01:13.452 [shutdown-thread-57-1] INFO org.apache.pulsar.broker.namespace.OwnedBundle - Disabling ownership: my-property/my-cluster/my-ns/0xd0000000_0xe0000000 : 01:01:13.653 [shutdown-thread-57-1] INFO org.apache.pulsar.broker.service.BrokerService - [persistent://my-property/my-cluster/my-ns/topic-partition-53] Unloading topic : 01:02:13.677 [shutdown-thread-57-1] INFO org.apache.pulsar.broker.namespace.OwnedBundle - Unloading my-property/my-cluster/my-ns/0xd0000000_0xe0000000 namespace-bundle with 0 topics completed in 60225.0 ms : 01:02:13.675 [shutdown-thread-57-1] ERROR org.apache.pulsar.broker.namespace.OwnedBundle - Failed to close topics in namespace my-property/my-cluster/my-ns/0xd0000000_0xe0000000 in 1/MINUTES timeout 01:02:13.677 [pulsar-ordered-OrderedExecutor-7-0-EventThread] INFO org.apache.pulsar.broker.namespace.OwnershipCache - [/namespace/my-property/my-cluster/my-ns/0xd0000000_0xe0000000] Removed zk lock for service unit: OK : 01:02:14.404 [shutdown-thread-57-1] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [my-property/my-cluster/my-ns/persistent/topic-partition-53] Closing managed ledger ``` ### Modification This fix will make sure that broker closes managed-ledger before giving up bundle ownership to avoid below exception at new broker where bundle moves ``` 01:02:30.995 [bookkeeper-ml-workers-OrderedExecutor-3-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-cluster/my-ns/persistent/topic-partition-53][my-sub] Metadata ledger creation failed org.apache.bookkeeper.mledger.ManagedLedgerException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion Caused by: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion at org.apache.zookeeper.KeeperException.create(KeeperException.java:118) ~[zookeeper-3.4.13.jar:3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03] at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$125(MetaStoreImplZookeeper.java:288) ~[managed-ledger-original-2.4.5-yahoo.jar:2.4.5-yahoo] at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [managed-ledger-original-2.4.5-yahoo.jar:2.4.5-yahoo] at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [bookkeeper-common-4.9.0.jar:4.9.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.32.Final.jar:4.1.32.Final] at java.lang.Thread.run(Thread.java:834) [?:?] ``` --- .../broker/admin/impl/PersistentTopicsBase.java | 2 +- .../pulsar/broker/namespace/OwnedBundle.java | 8 ++- .../pulsar/broker/service/BrokerService.java | 5 +- .../org/apache/pulsar/broker/service/Topic.java | 2 +- .../service/nonpersistent/NonPersistentTopic.java | 59 ++++++++++++---------- .../broker/service/persistent/PersistentTopic.java | 16 ++++-- .../broker/namespace/NamespaceServiceTest.java | 4 +- .../broker/namespace/OwnershipCacheTest.java | 4 +- .../pulsar/broker/service/BrokerServiceTest.java | 49 ++++++++++++++++++ .../apache/pulsar/client/api/TopicReaderTest.java | 2 +- .../apache/pulsar/compaction/CompactionTest.java | 4 +- 11 files changed, 113 insertions(+), 42 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ac1a359..0add56e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1898,7 +1898,7 @@ public class PersistentTopicsBase extends AdminResource { validateTopicOwnership(topicName, authoritative); try { Topic topic = getTopicReference(topicName); - topic.close().get(); + topic.close(false).get(); log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName); } catch (NullPointerException e) { log.error("[{}] topic {} not found", clientAppId(), topicName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java index 4d9ef19..eb73295 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java @@ -122,11 +122,17 @@ public class OwnedBundle { // close topics forcefully try { - unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(bundle).get(timeout, timeoutUnit); + unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(bundle, false).get(timeout, timeoutUnit); } catch (TimeoutException e) { // ignore topic-close failure to unload bundle LOG.error("Failed to close topics in namespace {} in {}/{} timeout", bundle.toString(), timeout, timeoutUnit); + try { + LOG.info("Forcefully close topics for bundle {}", bundle); + pulsar.getBrokerService().unloadServiceUnit(bundle, true).get(timeout, timeoutUnit); + } catch (Exception e1) { + LOG.error("Failed to close topics forcefully under bundle {}", bundle, e1); + } } catch (Exception e) { // ignore topic-close failure to unload bundle LOG.error("Failed to close topics under namespace {}", bundle.toString(), e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b9fd0a7..eb8a05f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1214,9 +1214,10 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies * Unload all the topic served by the broker service under the given service unit * * @param serviceUnit + * @param closeWithoutWaitingClientDisconnect don't wait for clients to disconnect and forcefully close managed-ledger * @return */ - public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit) { + public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit, boolean closeWithoutWaitingClientDisconnect) { CompletableFuture<Integer> result = new CompletableFuture<Integer>(); List<CompletableFuture<Void>> closeFutures = Lists.newArrayList(); topics.forEach((name, topicFuture) -> { @@ -1225,7 +1226,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies // Topic needs to be unloaded log.info("[{}] Unloading topic", topicName); closeFutures.add(topicFuture - .thenCompose(t -> t.isPresent() ? t.get().close() : CompletableFuture.completedFuture(null))); + .thenCompose(t -> t.isPresent() ? t.get().close(closeWithoutWaitingClientDisconnect) : CompletableFuture.completedFuture(null))); } }); CompletableFuture<Void> aggregator = FutureUtil.waitForAll(closeFutures); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 2396780..529b746 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -121,7 +121,7 @@ public interface Topic { CompletableFuture<Void> checkReplication(); - CompletableFuture<Void> close(); + CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect); void checkGC(int gcInterval); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 4f5d91f..6f054ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -149,7 +149,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { schemaValidationEnforced = policies.schema_validation_enforced; } catch (Exception e) { - log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage()); + log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, + e.getMessage()); isEncryptionRequired = false; } } @@ -285,8 +286,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { name -> new NonPersistentSubscription(this, subscriptionName)); try { - Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, cnx, - cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta); + Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, + cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta); subscription.addConsumer(consumer); if (!cnx.isActive()) { consumer.close(); @@ -316,7 +317,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { } @Override - public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition, boolean replicateSubscriptionState) { + public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition, + boolean replicateSubscriptionState) { return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName)); } @@ -335,9 +337,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { return delete(false, true, false); } - private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, - boolean closeIfClientsConnected, - boolean deleteSchema) { + private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean closeIfClientsConnected, + boolean deleteSchema) { CompletableFuture<Void> deleteFuture = new CompletableFuture<>(); lock.writeLock().lock(); @@ -418,16 +419,18 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { /** * Close this topic - close all producers and subscriptions associated with this topic - * + * + * @param closeWithoutWaitingClientDisconnect + * don't wait for client disconnect and forcefully close managed-ledger * @return Completable future indicating completion of close operation */ @Override - public CompletableFuture<Void> close() { + public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) { CompletableFuture<Void> closeFuture = new CompletableFuture<>(); lock.writeLock().lock(); try { - if (!isFenced) { + if (!isFenced || closeWithoutWaitingClientDisconnect) { isFenced = true; } else { log.warn("[{}] Topic is already being closed or deleted", topic); @@ -444,7 +447,10 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { producers.values().forEach(producer -> futures.add(producer.disconnect())); subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); - FutureUtil.waitForAll(futures).thenRun(() -> { + CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null) + : FutureUtil.waitForAll(futures); + + clientCloseFuture.thenRun(() -> { log.info("[{}] Topic closed", topic); // unload topic iterates over topics map and removing from the map with the same thread creates deadlock. // so, execute it in different thread @@ -531,10 +537,11 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { boolean startReplicator(String remoteCluster) { log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster); String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); - return addReplicationCluster(remoteCluster,NonPersistentTopic.this, localCluster); + return addReplicationCluster(remoteCluster, NonPersistentTopic.this, localCluster); } - protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic, String localCluster) { + protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic, + String localCluster) { AtomicBoolean isReplicatorStarted = new AtomicBoolean(true); replicators.computeIfAbsent(remoteCluster, r -> { try { @@ -618,8 +625,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { return replicators.get(remoteCluster); } - public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream, - ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) { + public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, + StatsOutputStream topicStatsStream, ClusterReplicationMetrics replStats, String namespace, + boolean hydratePublishers) { TopicStats topicStats = threadLocalTopicStats.get(); topicStats.reset(); @@ -648,7 +656,6 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { }); topicStatsStream.endList(); - // Start replicator stats topicStatsStream.startObject("replication"); nsStats.replicatorCount += topicStats.remotePublishersStats.size(); @@ -859,7 +866,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { @Override public CompletableFuture<Void> onPoliciesUpdate(Policies data) { if (log.isDebugEnabled()) { - log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required); + log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, + data.encryption_required); } isEncryptionRequired = data.encryption_required; setSchemaCompatibilityStrategy(data); @@ -912,17 +920,14 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class); - @Override public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) { - return hasSchema() - .thenCompose((hasSchema) -> { - if (hasSchema || isActive() || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) { - return checkSchemaCompatibleForConsumer(schema); - } else { - return addSchema(schema).thenCompose(schemaVersion-> - CompletableFuture.completedFuture(null)); - } - }); + return hasSchema().thenCompose((hasSchema) -> { + if (hasSchema || isActive() || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) { + return checkSchemaCompatibleForConsumer(schema); + } else { + return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null)); + } + }); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index bbcf2dd..34cad87 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -901,18 +901,25 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return deleteFuture; } + public CompletableFuture<Void> close() { + return close(false); + } + /** * Close this topic - close all producers and subscriptions associated with this topic * + * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger * @return Completable future indicating completion of close operation */ @Override - public CompletableFuture<Void> close() { + public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) { CompletableFuture<Void> closeFuture = new CompletableFuture<>(); lock.writeLock().lock(); try { - if (!isFenced) { + // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker + // forcefully wants to close managed-ledger without waiting all resources to be closed. + if (!isFenced || closeWithoutWaitingClientDisconnect) { isFenced = true; } else { log.warn("[{}] Topic is already being closed or deleted", topic); @@ -928,8 +935,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); + + CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null) + : FutureUtil.waitForAll(futures); - FutureUtil.waitForAll(futures).thenRun(() -> { + clientCloseFuture.thenRun(() -> { // After having disconnected all producers/consumers, close the managed ledger ledger.asyncClose(new CloseCallback() { @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index a83897a..fb83ff2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -277,7 +277,7 @@ public class NamespaceServiceTest extends BrokerTestBase { result.completeExceptionally(new RuntimeException("first time failed")); return result; } - }).when(spyTopic).close(); + }).when(spyTopic).close(false); NamespaceBundle bundle = pulsar.getNamespaceService().getBundle(TopicName.get(topicName)); try { pulsar.getNamespaceService().unloadNamespaceBundle(bundle); @@ -316,7 +316,7 @@ public class NamespaceServiceTest extends BrokerTestBase { public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwable { return new CompletableFuture<Void>(); } - }).when(spyTopic).close(); + }).when(spyTopic).close(false); NamespaceBundle bundle = pulsar.getNamespaceService().getBundle(TopicName.get(topicName)); // try to unload bundle whose topic will be stuck diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java index dd38020..fef3fa8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java @@ -19,8 +19,8 @@ package org.apache.pulsar.broker.namespace; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.pulsar.broker.PulsarService.webAddress; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -88,7 +88,7 @@ public class OwnershipCacheTest { bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32()); nsService = mock(NamespaceService.class); brokerService = mock(BrokerService.class); - doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(any()); + doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(any(), anyBoolean()); doReturn(zkCache).when(pulsar).getLocalZkCache(); doReturn(localCache).when(pulsar).getLocalZkCacheService(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 4c9309d..8a6f443 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -49,6 +50,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; @@ -58,6 +60,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.auth.AuthenticationTls; @@ -66,6 +69,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -912,4 +916,49 @@ public class BrokerServiceTest extends BrokerTestBase { assertEquals(policy.get().bundles.numBundles, totalBundle); } + /** + * It verifies that unloading bundle gracefully closes managed-ledger before removing ownership to avoid bad-zk + * version. + * + * @throws Exception + */ + @Test + public void testStuckTopicUnloading() throws Exception { + final String namespace = "prop/ns-abc"; + final String topicName = "persistent://" + namespace + "/unoadTopic"; + final String topicMlName = namespace + "/persistent/unoadTopic"; + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name") + .subscribe(); + consumer.close(); + + ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName).sendTimeout(5, + TimeUnit.SECONDS); + + Producer<byte[]> producer = producerBuilder.create(); + + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + + ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerClientFactory() + .getManagedLedgerFactory(); + Field ledgersField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers"); + ledgersField.setAccessible(true); + ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) ledgersField + .get(mlFactory); + assertNotNull(ledgers.get(topicMlName)); + + org.apache.pulsar.broker.service.Producer prod = spy(topic.producers.values().get(0)); + topic.producers.clear(); + topic.producers.add(prod); + CompletableFuture<Void> waitFuture = new CompletableFuture<Void>(); + doReturn(waitFuture).when(prod).disconnect(); + Set<NamespaceBundle> bundles = pulsar.getNamespaceService().getOwnedServiceUnits(); + for (NamespaceBundle bundle : bundles) { + String ns = bundle.getNamespaceObject().toString(); + System.out.println(); + if (namespace.equals(ns)) { + pulsar.getNamespaceService().unloadNamespaceBundle(bundle, 2, TimeUnit.SECONDS); + } + } + assertNull(ledgers.get(topicMlName)); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 206f4a7..69139b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -513,7 +513,7 @@ public class TopicReaderTest extends ProducerConsumerBase { } // cause broker to drop topic. Will be loaded next time we access it - pulsar.getBrokerService().getTopicReference(topic).get().close().get(); + pulsar.getBrokerService().getTopicReference(topic).get().close(false).get(); try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic) .startMessageId(MessageId.earliest).create()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index a54ff4c..233246c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -787,7 +787,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { } // force ledger roll - pulsar.getBrokerService().getTopicReference(topic).get().close().get(); + pulsar.getBrokerService().getTopicReference(topic).get().close(false).get(); // write a message to avoid issue #1517 try (Producer<byte[]> producerNormal = pulsarClient.newProducer().topic(topic).create()) { @@ -814,7 +814,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { ledgersOpened.clear(); // force broker to close resources for topic - pulsar.getBrokerService().getTopicReference(topic).get().close().get(); + pulsar.getBrokerService().getTopicReference(topic).get().close(false).get(); // write a message to avoid issue #1517 try (Producer<byte[]> producerNormal = pulsarClient.newProducer().topic(topic).create()) {
