This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 968dad2b68d289905036873c7f8c86611584e198
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Sun Jan 5 19:57:51 2020 -0800

    [pulsar-broker] close managed-ledgers before giving up bundle ownership to 
avoid bad zk-version (#5599)
    
    ### Motivation
    
    We have seen multiple below occurrence where unloading topic doesn't 
complete and gets stuck. and broker gives up ownership after a timeout and 
closing ml-factory closes unclosed managed-ledger which corrupts metadata 
zk-version and topic owned by new broker keeps failing with exception: 
`ManagedLedgerException$BadVersionException`
    
    right now, while unloading bundle: broker removes ownership of bundle after 
timeout even if topic's managed-ledger is not closed successfully and 
`ManagedLedgerFactoryImpl` closes unclosed ml-ledger on broker shutdown which 
causes bad zk-version in to the new broker and because of that cursors are not 
able to update cursor-metadata into zk.
    
    ```
    01:01:13.452 [shutdown-thread-57-1] INFO  
org.apache.pulsar.broker.namespace.OwnedBundle - Disabling ownership: 
my-property/my-cluster/my-ns/0xd0000000_0xe0000000
    :
    01:01:13.653 [shutdown-thread-57-1] INFO  
org.apache.pulsar.broker.service.BrokerService - 
[persistent://my-property/my-cluster/my-ns/topic-partition-53] Unloading topic
    :
    01:02:13.677 [shutdown-thread-57-1] INFO  
org.apache.pulsar.broker.namespace.OwnedBundle - Unloading 
my-property/my-cluster/my-ns/0xd0000000_0xe0000000 namespace-bundle with 0 
topics completed in 60225.0 ms
    :
    01:02:13.675 [shutdown-thread-57-1] ERROR 
org.apache.pulsar.broker.namespace.OwnedBundle - Failed to close topics in 
namespace my-property/my-cluster/my-ns/0xd0000000_0xe0000000 in 1/MINUTES 
timeout
    01:02:13.677 [pulsar-ordered-OrderedExecutor-7-0-EventThread] INFO  
org.apache.pulsar.broker.namespace.OwnershipCache - 
[/namespace/my-property/my-cluster/my-ns/0xd0000000_0xe0000000] Removed zk lock 
for service unit: OK
    :
    01:02:14.404 [shutdown-thread-57-1] INFO  
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - 
[my-property/my-cluster/my-ns/persistent/topic-partition-53] Closing managed 
ledger
    ```
    
    ### Modification
    
    This fix will make sure that broker closes managed-ledger before giving up 
bundle ownership to avoid below exception at new broker where bundle moves
    ```
    
    01:02:30.995 [bookkeeper-ml-workers-OrderedExecutor-3-0] ERROR 
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - 
[my-property/my-cluster/my-ns/persistent/topic-partition-53][my-sub] Metadata 
ledger creation failed
    org.apache.bookkeeper.mledger.ManagedLedgerException$BadVersionException: 
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = 
BadVersion
    Caused by: org.apache.zookeeper.KeeperException$BadVersionException: 
KeeperErrorCode = BadVersion
            at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:118) 
~[zookeeper-3.4.13.jar:3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03]
            at 
org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$125(MetaStoreImplZookeeper.java:288)
 ~[managed-ledger-original-2.4.5-yahoo.jar:2.4.5-yahoo]
            at 
org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) 
[managed-ledger-original-2.4.5-yahoo.jar:2.4.5-yahoo]
            at 
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) 
[bookkeeper-common-4.9.0.jar:4.9.0]
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
            at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [netty-all-4.1.32.Final.jar:4.1.32.Final]
            at java.lang.Thread.run(Thread.java:834) [?:?]
    ```
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  2 +-
 .../pulsar/broker/namespace/OwnedBundle.java       |  8 ++-
 .../pulsar/broker/service/BrokerService.java       |  5 +-
 .../org/apache/pulsar/broker/service/Topic.java    |  2 +-
 .../service/nonpersistent/NonPersistentTopic.java  | 59 ++++++++++++----------
 .../broker/service/persistent/PersistentTopic.java | 16 ++++--
 .../broker/namespace/NamespaceServiceTest.java     |  4 +-
 .../broker/namespace/OwnershipCacheTest.java       |  4 +-
 .../pulsar/broker/service/BrokerServiceTest.java   | 49 ++++++++++++++++++
 .../apache/pulsar/client/api/TopicReaderTest.java  |  2 +-
 .../apache/pulsar/compaction/CompactionTest.java   |  4 +-
 11 files changed, 113 insertions(+), 42 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ac1a359..0add56e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1898,7 +1898,7 @@ public class PersistentTopicsBase extends AdminResource {
         validateTopicOwnership(topicName, authoritative);
         try {
             Topic topic = getTopicReference(topicName);
-            topic.close().get();
+            topic.close(false).get();
             log.info("[{}] Successfully unloaded topic {}", clientAppId(), 
topicName);
         } catch (NullPointerException e) {
             log.error("[{}] topic {} not found", clientAppId(), topicName);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java
index 4d9ef19..eb73295 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java
@@ -122,11 +122,17 @@ public class OwnedBundle {
 
             // close topics forcefully
             try {
-                unloadedTopics = 
pulsar.getBrokerService().unloadServiceUnit(bundle).get(timeout, timeoutUnit);
+                unloadedTopics = 
pulsar.getBrokerService().unloadServiceUnit(bundle, false).get(timeout, 
timeoutUnit);
             } catch (TimeoutException e) {
                 // ignore topic-close failure to unload bundle
                 LOG.error("Failed to close topics in namespace {} in {}/{} 
timeout", bundle.toString(), timeout,
                         timeoutUnit);
+                try {
+                    LOG.info("Forcefully close topics for bundle {}", bundle);
+                    pulsar.getBrokerService().unloadServiceUnit(bundle, 
true).get(timeout, timeoutUnit);
+                } catch (Exception e1) {
+                    LOG.error("Failed to close topics forcefully under bundle 
{}", bundle, e1);
+                }
             } catch (Exception e) {
                 // ignore topic-close failure to unload bundle
                 LOG.error("Failed to close topics under namespace {}", 
bundle.toString(), e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b9fd0a7..eb8a05f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1214,9 +1214,10 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
      * Unload all the topic served by the broker service under the given 
service unit
      *
      * @param serviceUnit
+     * @param closeWithoutWaitingClientDisconnect don't wait for clients to 
disconnect and forcefully close managed-ledger
      * @return
      */
-    public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle 
serviceUnit) {
+    public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle 
serviceUnit, boolean closeWithoutWaitingClientDisconnect) {
         CompletableFuture<Integer> result = new CompletableFuture<Integer>();
         List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
         topics.forEach((name, topicFuture) -> {
@@ -1225,7 +1226,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                 // Topic needs to be unloaded
                 log.info("[{}] Unloading topic", topicName);
                 closeFutures.add(topicFuture
-                        .thenCompose(t -> t.isPresent() ? t.get().close() : 
CompletableFuture.completedFuture(null)));
+                        .thenCompose(t -> t.isPresent() ? 
t.get().close(closeWithoutWaitingClientDisconnect) : 
CompletableFuture.completedFuture(null)));
             }
         });
         CompletableFuture<Void> aggregator = 
FutureUtil.waitForAll(closeFutures);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 2396780..529b746 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -121,7 +121,7 @@ public interface Topic {
 
     CompletableFuture<Void> checkReplication();
 
-    CompletableFuture<Void> close();
+    CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect);
 
     void checkGC(int gcInterval);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 4f5d91f..6f054ca 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -149,7 +149,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
             schemaValidationEnforced = policies.schema_validation_enforced;
 
         } catch (Exception e) {
-            log.warn("[{}] Error getting policies {} and isEncryptionRequired 
will be set to false", topic, e.getMessage());
+            log.warn("[{}] Error getting policies {} and isEncryptionRequired 
will be set to false", topic,
+                    e.getMessage());
             isEncryptionRequired = false;
         }
     }
@@ -285,8 +286,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
                 name -> new NonPersistentSubscription(this, subscriptionName));
 
         try {
-            Consumer consumer = new Consumer(subscription, subType, topic, 
consumerId, priorityLevel, consumerName, 0, cnx,
-                                             cnx.getRole(), metadata, 
readCompacted, initialPosition, keySharedMeta);
+            Consumer consumer = new Consumer(subscription, subType, topic, 
consumerId, priorityLevel, consumerName, 0,
+                    cnx, cnx.getRole(), metadata, readCompacted, 
initialPosition, keySharedMeta);
             subscription.addConsumer(consumer);
             if (!cnx.isActive()) {
                 consumer.close();
@@ -316,7 +317,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
     }
 
     @Override
-    public CompletableFuture<Subscription> createSubscription(String 
subscriptionName, InitialPosition initialPosition, boolean 
replicateSubscriptionState) {
+    public CompletableFuture<Subscription> createSubscription(String 
subscriptionName, InitialPosition initialPosition,
+            boolean replicateSubscriptionState) {
         return CompletableFuture.completedFuture(new 
NonPersistentSubscription(this, subscriptionName));
     }
 
@@ -335,9 +337,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
         return delete(false, true, false);
     }
 
-    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
-                                           boolean closeIfClientsConnected,
-                                           boolean deleteSchema) {
+    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, 
boolean closeIfClientsConnected,
+            boolean deleteSchema) {
         CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
 
         lock.writeLock().lock();
@@ -418,16 +419,18 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
 
     /**
      * Close this topic - close all producers and subscriptions associated 
with this topic
-     *
+     * 
+     * @param closeWithoutWaitingClientDisconnect
+     *            don't wait for client disconnect and forcefully close 
managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
-    public CompletableFuture<Void> close() {
+    public CompletableFuture<Void> close(boolean 
closeWithoutWaitingClientDisconnect) {
         CompletableFuture<Void> closeFuture = new CompletableFuture<>();
 
         lock.writeLock().lock();
         try {
-            if (!isFenced) {
+            if (!isFenced || closeWithoutWaitingClientDisconnect) {
                 isFenced = true;
             } else {
                 log.warn("[{}] Topic is already being closed or deleted", 
topic);
@@ -444,7 +447,10 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
         producers.values().forEach(producer -> 
futures.add(producer.disconnect()));
         subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
 
-        FutureUtil.waitForAll(futures).thenRun(() -> {
+        CompletableFuture<Void> clientCloseFuture = 
closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null)
+                : FutureUtil.waitForAll(futures);
+
+        clientCloseFuture.thenRun(() -> {
             log.info("[{}] Topic closed", topic);
             // unload topic iterates over topics map and removing from the map 
with the same thread creates deadlock.
             // so, execute it in different thread
@@ -531,10 +537,11 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
     boolean startReplicator(String remoteCluster) {
         log.info("[{}] Starting replicator to remote: {}", topic, 
remoteCluster);
         String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
-        return addReplicationCluster(remoteCluster,NonPersistentTopic.this, 
localCluster);
+        return addReplicationCluster(remoteCluster, NonPersistentTopic.this, 
localCluster);
     }
 
-    protected boolean addReplicationCluster(String remoteCluster, 
NonPersistentTopic nonPersistentTopic, String localCluster) {
+    protected boolean addReplicationCluster(String remoteCluster, 
NonPersistentTopic nonPersistentTopic,
+            String localCluster) {
         AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
         replicators.computeIfAbsent(remoteCluster, r -> {
             try {
@@ -618,8 +625,9 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
         return replicators.get(remoteCluster);
     }
 
-    public void updateRates(NamespaceStats nsStats, NamespaceBundleStats 
bundleStats, StatsOutputStream topicStatsStream,
-            ClusterReplicationMetrics replStats, String namespace, boolean 
hydratePublishers) {
+    public void updateRates(NamespaceStats nsStats, NamespaceBundleStats 
bundleStats,
+            StatsOutputStream topicStatsStream, ClusterReplicationMetrics 
replStats, String namespace,
+            boolean hydratePublishers) {
 
         TopicStats topicStats = threadLocalTopicStats.get();
         topicStats.reset();
@@ -648,7 +656,6 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
         });
         topicStatsStream.endList();
 
-
         // Start replicator stats
         topicStatsStream.startObject("replication");
         nsStats.replicatorCount += topicStats.remotePublishersStats.size();
@@ -859,7 +866,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
     @Override
     public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
         if (log.isDebugEnabled()) {
-            log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, 
isEncryptionRequired, data.encryption_required);
+            log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, 
isEncryptionRequired,
+                    data.encryption_required);
         }
         isEncryptionRequired = data.encryption_required;
         setSchemaCompatibilityStrategy(data);
@@ -912,17 +920,14 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
 
     private static final Logger log = 
LoggerFactory.getLogger(NonPersistentTopic.class);
 
-
     @Override
     public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData 
schema) {
-        return hasSchema()
-                .thenCompose((hasSchema) -> {
-                    if (hasSchema || isActive() || 
ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
-                        return checkSchemaCompatibleForConsumer(schema);
-                    } else {
-                        return addSchema(schema).thenCompose(schemaVersion->
-                                CompletableFuture.completedFuture(null));
-                    }
-                });
+        return hasSchema().thenCompose((hasSchema) -> {
+            if (hasSchema || isActive() || 
ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
+                return checkSchemaCompatibleForConsumer(schema);
+            } else {
+                return addSchema(schema).thenCompose(schemaVersion -> 
CompletableFuture.completedFuture(null));
+            }
+        });
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index bbcf2dd..34cad87 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -901,18 +901,25 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return deleteFuture;
     }
 
+    public CompletableFuture<Void> close() {
+        return close(false);
+    }
+    
     /**
      * Close this topic - close all producers and subscriptions associated 
with this topic
      *
+     * @param closeWithoutWaitingClientDisconnect don't wait for client 
disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
      */
     @Override
-    public CompletableFuture<Void> close() {
+    public CompletableFuture<Void> close(boolean 
closeWithoutWaitingClientDisconnect) {
         CompletableFuture<Void> closeFuture = new CompletableFuture<>();
 
         lock.writeLock().lock();
         try {
-            if (!isFenced) {
+            // closing managed-ledger waits until all 
producers/consumers/replicators get closed. Sometimes, broker
+            // forcefully wants to close managed-ledger without waiting all 
resources to be closed.
+            if (!isFenced || closeWithoutWaitingClientDisconnect) {
                 isFenced = true;
             } else {
                 log.warn("[{}] Topic is already being closed or deleted", 
topic);
@@ -928,8 +935,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         replicators.forEach((cluster, replicator) -> 
futures.add(replicator.disconnect()));
         producers.values().forEach(producer -> 
futures.add(producer.disconnect()));
         subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
+        
+        CompletableFuture<Void> clientCloseFuture = 
closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null)
+                : FutureUtil.waitForAll(futures);
 
-        FutureUtil.waitForAll(futures).thenRun(() -> {
+        clientCloseFuture.thenRun(() -> {
             // After having disconnected all producers/consumers, close the 
managed ledger
             ledger.asyncClose(new CloseCallback() {
                 @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index a83897a..fb83ff2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -277,7 +277,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
                 result.completeExceptionally(new RuntimeException("first time 
failed"));
                 return result;
             }
-        }).when(spyTopic).close();
+        }).when(spyTopic).close(false);
         NamespaceBundle bundle = 
pulsar.getNamespaceService().getBundle(TopicName.get(topicName));
         try {
             pulsar.getNamespaceService().unloadNamespaceBundle(bundle);
@@ -316,7 +316,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
             public CompletableFuture<Void> answer(InvocationOnMock invocation) 
throws Throwable {
                 return new CompletableFuture<Void>();
             }
-        }).when(spyTopic).close();
+        }).when(spyTopic).close(false);
         NamespaceBundle bundle = 
pulsar.getNamespaceService().getBundle(TopicName.get(topicName));
 
         // try to unload bundle whose topic will be stuck
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index dd38020..fef3fa8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -19,8 +19,8 @@
 package org.apache.pulsar.broker.namespace;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.pulsar.broker.PulsarService.webAddress;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -88,7 +88,7 @@ public class OwnershipCacheTest {
         bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
         nsService = mock(NamespaceService.class);
         brokerService = mock(BrokerService.class);
-        
doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(any());
+        
doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(any(),
 anyBoolean());
 
         doReturn(zkCache).when(pulsar).getLocalZkCache();
         doReturn(localCache).when(pulsar).getLocalZkCacheService();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 4c9309d..8a6f443 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -49,6 +50,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
@@ -58,6 +60,7 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
@@ -66,6 +69,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -912,4 +916,49 @@ public class BrokerServiceTest extends BrokerTestBase {
         assertEquals(policy.get().bundles.numBundles, totalBundle);
     }
 
+    /**
+     * It verifies that unloading bundle gracefully closes managed-ledger 
before removing ownership to avoid bad-zk
+     * version.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testStuckTopicUnloading() throws Exception {
+        final String namespace = "prop/ns-abc";
+        final String topicName = "persistent://" + namespace + "/unoadTopic";
+        final String topicMlName = namespace + "/persistent/unoadTopic";
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+                .subscribe();
+        consumer.close();
+
+        ProducerBuilder<byte[]> producerBuilder = 
pulsarClient.newProducer().topic(topicName).sendTimeout(5,
+                TimeUnit.SECONDS);
+
+        Producer<byte[]> producer = producerBuilder.create();
+
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+
+        ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) 
pulsar.getManagedLedgerClientFactory()
+                .getManagedLedgerFactory();
+        Field ledgersField = 
ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
+        ledgersField.setAccessible(true);
+        ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> 
ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) 
ledgersField
+                .get(mlFactory);
+        assertNotNull(ledgers.get(topicMlName));
+
+        org.apache.pulsar.broker.service.Producer prod = 
spy(topic.producers.values().get(0));
+        topic.producers.clear();
+        topic.producers.add(prod);
+        CompletableFuture<Void> waitFuture = new CompletableFuture<Void>();
+        doReturn(waitFuture).when(prod).disconnect();
+        Set<NamespaceBundle> bundles = 
pulsar.getNamespaceService().getOwnedServiceUnits();
+        for (NamespaceBundle bundle : bundles) {
+            String ns = bundle.getNamespaceObject().toString();
+            System.out.println();
+            if (namespace.equals(ns)) {
+                pulsar.getNamespaceService().unloadNamespaceBundle(bundle, 2, 
TimeUnit.SECONDS);
+            }
+        }
+        assertNull(ledgers.get(topicMlName));
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 206f4a7..69139b5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -513,7 +513,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         }
 
         // cause broker to drop topic. Will be loaded next time we access it
-        pulsar.getBrokerService().getTopicReference(topic).get().close().get();
+        
pulsar.getBrokerService().getTopicReference(topic).get().close(false).get();
 
         try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
             .startMessageId(MessageId.earliest).create()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index a54ff4c..233246c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -787,7 +787,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         }
 
         // force ledger roll
-        pulsar.getBrokerService().getTopicReference(topic).get().close().get();
+        
pulsar.getBrokerService().getTopicReference(topic).get().close(false).get();
 
         // write a message to avoid issue #1517
         try (Producer<byte[]> producerNormal = 
pulsarClient.newProducer().topic(topic).create()) {
@@ -814,7 +814,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         ledgersOpened.clear();
 
         // force broker to close resources for topic
-        pulsar.getBrokerService().getTopicReference(topic).get().close().get();
+        
pulsar.getBrokerService().getTopicReference(topic).get().close(false).get();
 
         // write a message to avoid issue #1517
         try (Producer<byte[]> producerNormal = 
pulsarClient.newProducer().topic(topic).create()) {

Reply via email to