This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 156682a  [managed-ledger] close bk-client factory gracefully (#4580)
156682a is described below

commit 156682a7609ba416552160ee5cbf5fe774ade1a7
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Jun 24 18:16:12 2019 -0700

    [managed-ledger] close bk-client factory gracefully (#4580)
    
    ### Motivation
    User can create tools on bookkeeper using ManagedLedger factory which 
provides 
[constructor](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java#L121)
 to create ml-factory using self-managed bookkeeper (it's not used by broker).
    So, in case of self-managed bk-client, ML-Factory couldn't shutdown it 
gracefully and we see issue: #4573
    
    ### Modification
    - ML-Factory creates `DefaultBkFactory` to create self-managed bk-client 
and shutdowns same bk-client while closing the resource.
---
 .../mledger/impl/ManagedLedgerFactoryImpl.java     | 33 ++++++++++++++--------
 1 file changed, 21 insertions(+), 12 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 95ee6b4..39ff102 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Maps;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -116,15 +117,8 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
     }
 
     private ManagedLedgerFactoryImpl(ZooKeeper zkc, ClientConfiguration 
bkClientConfiguration,
-            ManagedLedgerFactoryConfig config)
-            throws Exception {
-        this((policyConfig) -> {
-            try {
-                return new BookKeeper(bkClientConfiguration, zkc);
-            } catch (Exception e) {
-                throw new IllegalStateException(e);
-            }
-        }, true /* isBookkeeperManaged */, zkc, config);
+            ManagedLedgerFactoryConfig config) throws Exception {
+        this(new DefaultBkFactory(bkClientConfiguration, zkc), true /* 
isBookkeeperManaged */, zkc, config);
     }
 
     public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper 
zooKeeper) throws Exception {
@@ -171,6 +165,21 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         cacheEvictionExecutor.execute(this::cacheEvictionTask);
     }
 
+    static class DefaultBkFactory implements 
BookkeeperFactoryForCustomEnsemblePlacementPolicy {
+
+        private final BookKeeper bkClient;
+
+        public DefaultBkFactory(ClientConfiguration bkClientConfiguration, 
ZooKeeper zkc)
+                throws BKException, IOException, InterruptedException {
+            bkClient = new BookKeeper(bkClientConfiguration, zkc);
+        }
+
+        @Override
+        public BookKeeper get(EnsemblePlacementPolicyConfig policy) {
+            return bkClient;
+        }
+    }
+    
     private synchronized void refreshStats() {
         long now = System.nanoTime();
         long period = now - lastStatTimestamp;
@@ -428,9 +437,9 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
 
         if (isBookkeeperManaged) {
             try {
-                BookKeeper bkFactory = bookkeeperFactory.get();
-                if (bkFactory != null) {
-                    bkFactory.close();
+                BookKeeper bookkeeper = bookkeeperFactory.get();
+                if (bookkeeper != null) {
+                    bookkeeper.close();
                 }
             } catch (BKException e) {
                 throw new ManagedLedgerException(e);

Reply via email to