This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 156682a [managed-ledger] close bk-client factory gracefully (#4580)
156682a is described below
commit 156682a7609ba416552160ee5cbf5fe774ade1a7
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Jun 24 18:16:12 2019 -0700
[managed-ledger] close bk-client factory gracefully (#4580)
### Motivation
User can create tools on bookkeeper using ManagedLedger factory which
provides
[constructor](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java#L121)
to create ml-factory using self-managed bookkeeper (it's not used by broker).
So, in case of self-managed bk-client, ML-Factory couldn't shutdown it
gracefully and we see issue: #4573
### Modification
- ML-Factory creates `DefaultBkFactory` to create self-managed bk-client
and shutdowns same bk-client while closing the resource.
---
.../mledger/impl/ManagedLedgerFactoryImpl.java | 33 ++++++++++++++--------
1 file changed, 21 insertions(+), 12 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 95ee6b4..39ff102 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Maps;
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -116,15 +117,8 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
}
private ManagedLedgerFactoryImpl(ZooKeeper zkc, ClientConfiguration
bkClientConfiguration,
- ManagedLedgerFactoryConfig config)
- throws Exception {
- this((policyConfig) -> {
- try {
- return new BookKeeper(bkClientConfiguration, zkc);
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }, true /* isBookkeeperManaged */, zkc, config);
+ ManagedLedgerFactoryConfig config) throws Exception {
+ this(new DefaultBkFactory(bkClientConfiguration, zkc), true /*
isBookkeeperManaged */, zkc, config);
}
public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper
zooKeeper) throws Exception {
@@ -171,6 +165,21 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
cacheEvictionExecutor.execute(this::cacheEvictionTask);
}
+ static class DefaultBkFactory implements
BookkeeperFactoryForCustomEnsemblePlacementPolicy {
+
+ private final BookKeeper bkClient;
+
+ public DefaultBkFactory(ClientConfiguration bkClientConfiguration,
ZooKeeper zkc)
+ throws BKException, IOException, InterruptedException {
+ bkClient = new BookKeeper(bkClientConfiguration, zkc);
+ }
+
+ @Override
+ public BookKeeper get(EnsemblePlacementPolicyConfig policy) {
+ return bkClient;
+ }
+ }
+
private synchronized void refreshStats() {
long now = System.nanoTime();
long period = now - lastStatTimestamp;
@@ -428,9 +437,9 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
if (isBookkeeperManaged) {
try {
- BookKeeper bkFactory = bookkeeperFactory.get();
- if (bkFactory != null) {
- bkFactory.close();
+ BookKeeper bookkeeper = bookkeeperFactory.get();
+ if (bookkeeper != null) {
+ bookkeeper.close();
}
} catch (BKException e) {
throw new ManagedLedgerException(e);