This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d74010c271a [improve] Refactored BK ClientFactory to return futures
(#22853)
d74010c271a is described below
commit d74010c271abfb0a77a4dacf0ab072a957afeb5a
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Jun 5 17:09:32 2024 -0700
[improve] Refactored BK ClientFactory to return futures (#22853)
---
.../mledger/impl/ManagedLedgerFactoryImpl.java | 223 ++++++++++-----------
.../mledger/impl/ManagedLedgerOfflineBacklog.java | 20 +-
.../pulsar/broker/BookKeeperClientFactory.java | 19 +-
.../pulsar/broker/BookKeeperClientFactoryImpl.java | 28 +--
.../pulsar/broker/ManagedLedgerClientFactory.java | 39 ++--
.../bucket/BookkeeperBucketSnapshotStorage.java | 2 +-
.../service/schema/BookkeeperSchemaStorage.java | 2 +-
.../apache/pulsar/compaction/CompactorTool.java | 2 +-
.../broker/MockedBookKeeperClientFactory.java | 18 +-
.../testcontext/MockBookKeeperClientFactory.java | 15 +-
.../pulsar/compaction/CompactedTopicTest.java | 6 +-
.../pulsar/compaction/CompactionRetentionTest.java | 2 +-
.../apache/pulsar/compaction/CompactionTest.java | 2 +-
.../apache/pulsar/compaction/CompactorTest.java | 2 +-
.../compaction/ServiceUnitStateCompactionTest.java | 2 +-
.../compaction/TopicCompactionServiceTest.java | 2 +-
16 files changed, 193 insertions(+), 191 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index d867f2f4c02..ed803a81462 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -161,7 +161,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
BookKeeper bookKeeper,
ManagedLedgerFactoryConfig config)
throws Exception {
- this(metadataStore, (policyConfig) -> bookKeeper, config);
+ this(metadataStore, (policyConfig) ->
CompletableFuture.completedFuture(bookKeeper), config);
}
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
@@ -233,8 +233,8 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
}
@Override
- public BookKeeper get(EnsemblePlacementPolicyConfig policy) {
- return bkClient;
+ public CompletableFuture<BookKeeper> get(EnsemblePlacementPolicyConfig
policy) {
+ return CompletableFuture.completedFuture(bkClient);
}
}
@@ -378,56 +378,63 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
ledgers.computeIfAbsent(name, (mlName) -> {
// Create the managed ledger
CompletableFuture<ManagedLedgerImpl> future = new
CompletableFuture<>();
- BookKeeper bk = bookkeeperFactory.get(
- new
EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
-
config.getBookKeeperEnsemblePlacementPolicyProperties()));
- final ManagedLedgerImpl newledger = config.getShadowSource() ==
null
- ? new ManagedLedgerImpl(this, bk, store, config,
scheduledExecutor, name, mlOwnershipChecker)
- : new ShadowManagedLedgerImpl(this, bk, store, config,
scheduledExecutor, name,
- mlOwnershipChecker);
- PendingInitializeManagedLedger pendingLedger = new
PendingInitializeManagedLedger(newledger);
- pendingInitializeLedgers.put(name, pendingLedger);
- newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
- @Override
- public void initializeComplete() {
- log.info("[{}] Successfully initialize managed ledger",
name);
- pendingInitializeLedgers.remove(name, pendingLedger);
- future.complete(newledger);
-
- // May need to update the cursor position
- newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
- // May need to trigger offloading
- if (config.isTriggerOffloadOnTopicLoad()) {
-
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
- }
- }
-
- @Override
- public void initializeFailed(ManagedLedgerException e) {
- if (config.isCreateIfMissing()) {
- log.error("[{}] Failed to initialize managed ledger:
{}", name, e.getMessage());
- }
-
- // Clean the map if initialization fails
- ledgers.remove(name, future);
-
- if (pendingInitializeLedgers.remove(name, pendingLedger)) {
- pendingLedger.ledger.asyncClose(new CloseCallback() {
+ bookkeeperFactory.get(
+ new
EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
+
config.getBookKeeperEnsemblePlacementPolicyProperties()))
+ .thenAccept(bk -> {
+ final ManagedLedgerImpl newledger =
config.getShadowSource() == null
+ ? new ManagedLedgerImpl(this, bk, store,
config, scheduledExecutor, name,
+ mlOwnershipChecker)
+ : new ShadowManagedLedgerImpl(this, bk, store,
config, scheduledExecutor, name,
+ mlOwnershipChecker);
+ PendingInitializeManagedLedger pendingLedger = new
PendingInitializeManagedLedger(newledger);
+ pendingInitializeLedgers.put(name, pendingLedger);
+ newledger.initialize(new
ManagedLedgerInitializeLedgerCallback() {
@Override
- public void closeComplete(Object ctx) {
- // no-op
+ public void initializeComplete() {
+ log.info("[{}] Successfully initialize managed
ledger", name);
+ pendingInitializeLedgers.remove(name,
pendingLedger);
+ future.complete(newledger);
+
+ // May need to update the cursor position
+
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
+ // May need to trigger offloading
+ if (config.isTriggerOffloadOnTopicLoad()) {
+
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
+ }
}
@Override
- public void closeFailed(ManagedLedgerException
exception, Object ctx) {
- log.warn("[{}] Failed to a pending
initialization managed ledger", name, exception);
+ public void
initializeFailed(ManagedLedgerException e) {
+ if (config.isCreateIfMissing()) {
+ log.error("[{}] Failed to initialize
managed ledger: {}", name, e.getMessage());
+ }
+
+ // Clean the map if initialization fails
+ ledgers.remove(name, future);
+
+ if (pendingInitializeLedgers.remove(name,
pendingLedger)) {
+ pendingLedger.ledger.asyncClose(new
CloseCallback() {
+ @Override
+ public void closeComplete(Object ctx) {
+ // no-op
+ }
+
+ @Override
+ public void
closeFailed(ManagedLedgerException exception, Object ctx) {
+ log.warn("[{}] Failed to a pending
initialization managed ledger", name,
+ exception);
+ }
+ }, null);
+ }
+
+ future.completeExceptionally(e);
}
}, null);
- }
-
- future.completeExceptionally(e);
- }
- }, null);
+ }).exceptionally(ex -> {
+ future.completeExceptionally(ex);
+ return null;
+ });
return future;
}).thenAccept(ml -> callback.openLedgerComplete(ml,
ctx)).exceptionally(exception -> {
callback.openLedgerFailed((ManagedLedgerException)
exception.getCause(), ctx);
@@ -443,20 +450,22 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
callback.openReadOnlyManagedLedgerFailed(
new
ManagedLedgerException.ManagedLedgerFactoryClosedException(), ctx);
}
- ReadOnlyManagedLedgerImpl roManagedLedger = new
ReadOnlyManagedLedgerImpl(this,
- bookkeeperFactory
- .get(new
EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
-
config.getBookKeeperEnsemblePlacementPolicyProperties())),
- store, config, scheduledExecutor, managedLedgerName);
- roManagedLedger.initialize().thenRun(() -> {
- log.info("[{}] Successfully initialize Read-only managed ledger",
managedLedgerName);
- callback.openReadOnlyManagedLedgerComplete(roManagedLedger, ctx);
-
- }).exceptionally(e -> {
- log.error("[{}] Failed to initialize Read-only managed ledger",
managedLedgerName, e);
- callback.openReadOnlyManagedLedgerFailed((ManagedLedgerException)
e.getCause(), ctx);
- return null;
- });
+
+ bookkeeperFactory
+ .get(new
EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
+
config.getBookKeeperEnsemblePlacementPolicyProperties()))
+ .thenCompose(bk -> {
+ ReadOnlyManagedLedgerImpl roManagedLedger = new
ReadOnlyManagedLedgerImpl(this, bk,
+ store, config, scheduledExecutor,
managedLedgerName);
+ return roManagedLedger.initialize().thenApply(v ->
roManagedLedger);
+ }).thenAccept(roManagedLedger -> {
+ log.info("[{}] Successfully initialize Read-only managed
ledger", managedLedgerName);
+
callback.openReadOnlyManagedLedgerComplete(roManagedLedger, ctx);
+ }).exceptionally(e -> {
+ log.error("[{}] Failed to initialize Read-only managed
ledger", managedLedgerName, e);
+
callback.openReadOnlyManagedLedgerFailed((ManagedLedgerException) e.getCause(),
ctx);
+ return null;
+ });
}
@Override
@@ -578,49 +587,35 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
ledgerFuture.completeExceptionally(new
ManagedLedgerException.ManagedLedgerFactoryClosedException());
}
}
- CompletableFuture<Void> bookkeeperFuture = new CompletableFuture<>();
- futures.add(bookkeeperFuture);
- futures.add(CompletableFuture.runAsync(() -> {
- if (isBookkeeperManaged) {
- try {
- BookKeeper bookkeeper = bookkeeperFactory.get();
- if (bookkeeper != null) {
- bookkeeper.close();
- }
- bookkeeperFuture.complete(null);
- } catch (Throwable throwable) {
- bookkeeperFuture.completeExceptionally(throwable);
- }
- } else {
- bookkeeperFuture.complete(null);
- }
- if (!ledgers.isEmpty()) {
- log.info("Force closing {} ledgers.", ledgers.size());
- //make sure all callbacks is called.
- ledgers.forEach(((ledgerName, ledgerFuture) -> {
- if (!ledgerFuture.isDone()) {
- ledgerFuture.completeExceptionally(
- new
ManagedLedgerException.ManagedLedgerFactoryClosedException());
- } else {
- ManagedLedgerImpl managedLedger =
ledgerFuture.getNow(null);
- if (managedLedger == null) {
- return;
- }
- try {
- managedLedger.close();
- } catch (Throwable throwable) {
- log.warn("[{}] Got exception when closing managed
ledger: {}", managedLedger.getName(),
- throwable);
+ CompletableFuture<BookKeeper> bookkeeperFuture = isBookkeeperManaged
+ ? bookkeeperFactory.get()
+ : CompletableFuture.completedFuture(null);
+ return bookkeeperFuture
+ .thenRun(() -> {
+ log.info("Closing {} ledgers.", ledgers.size());
+ //make sure all callbacks is called.
+ ledgers.forEach(((ledgerName, ledgerFuture) -> {
+ if (!ledgerFuture.isDone()) {
+ ledgerFuture.completeExceptionally(
+ new
ManagedLedgerException.ManagedLedgerFactoryClosedException());
+ } else {
+ ManagedLedgerImpl managedLedger =
ledgerFuture.getNow(null);
+ if (managedLedger == null) {
+ return;
+ }
+ try {
+ managedLedger.close();
+ } catch (Throwable throwable) {
+ log.warn("[{}] Got exception when closing
managed ledger: {}", managedLedger.getName(),
+ throwable);
+ }
}
- }
- }));
- }
- }));
- return FutureUtil.waitForAll(futures).thenAcceptAsync(__ -> {
- //wait for tasks in scheduledExecutor executed.
- scheduledExecutor.shutdownNow();
- entryCacheManager.clear();
- });
+ }));
+ }).thenAcceptAsync(__ -> {
+ //wait for tasks in scheduledExecutor executed.
+ scheduledExecutor.shutdownNow();
+ entryCacheManager.clear();
+ });
}
@Override
@@ -861,14 +856,14 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
asyncGetManagedLedgerInfo(managedLedgerName, new
ManagedLedgerInfoCallback() {
@Override
public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
- BookKeeper bkc = getBookKeeper();
-
- // First delete all cursors resources
- List<CompletableFuture<Void>> futures =
info.cursors.entrySet().stream()
- .map(e -> deleteCursor(bkc, managedLedgerName,
e.getKey(), e.getValue()))
- .collect(Collectors.toList());
- Futures.waitForAll(futures).thenRun(() -> {
- deleteManagedLedgerData(bkc, managedLedgerName, info,
mlConfigFuture, callback, ctx);
+ getBookKeeper().thenCompose(bk -> {
+ // First delete all cursors resources
+ List<CompletableFuture<Void>> futures =
info.cursors.entrySet().stream()
+ .map(e -> deleteCursor(bk, managedLedgerName,
e.getKey(), e.getValue()))
+ .collect(Collectors.toList());
+ return Futures.waitForAll(futures).thenApply(v -> bk);
+ }).thenAccept(bk -> {
+ deleteManagedLedgerData(bk, managedLedgerName, info,
mlConfigFuture, callback, ctx);
}).exceptionally(ex -> {
callback.deleteLedgerFailed(new
ManagedLedgerException(ex), ctx);
return null;
@@ -1053,7 +1048,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
return this.mbean;
}
- public BookKeeper getBookKeeper() {
+ public CompletableFuture<BookKeeper> getBookKeeper() {
return bookkeeperFactory.get();
}
@@ -1062,7 +1057,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
*
*/
public interface BookkeeperFactoryForCustomEnsemblePlacementPolicy {
- default BookKeeper get() {
+ default CompletableFuture<BookKeeper> get() {
return get(null);
}
@@ -1073,7 +1068,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
* @param ensemblePlacementPolicyMetadata
* @return
*/
- BookKeeper get(EnsemblePlacementPolicyConfig
ensemblePlacementPolicyMetadata);
+ CompletableFuture<BookKeeper> get(EnsemblePlacementPolicyConfig
ensemblePlacementPolicyMetadata);
}
private static final Logger log =
LoggerFactory.getLogger(ManagedLedgerFactoryImpl.class);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
index a271d439e06..81cd94e5bf9 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
@@ -140,7 +140,7 @@ public class ManagedLedgerOfflineBacklog {
final NavigableMap<Long,
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers) throws Exception {
String managedLedgerName = topicName.getPersistenceNamingEncoding();
MetaStore store = factory.getMetaStore();
- BookKeeper bk = factory.getBookKeeper();
+
final CountDownLatch mlMetaCounter = new CountDownLatch(1);
store.getManagedLedgerInfo(managedLedgerName, false /* createIfMissing
*/,
@@ -180,12 +180,16 @@ public class ManagedLedgerOfflineBacklog {
if (log.isDebugEnabled()) {
log.debug("[{}] Opening ledger {}",
managedLedgerName, id);
}
- try {
- bk.asyncOpenLedgerNoRecovery(id, digestType,
password, opencb, null);
- } catch (Exception e) {
- log.warn("[{}] Failed to open ledger {}: {}",
managedLedgerName, id, e);
- mlMetaCounter.countDown();
- }
+
+ factory.getBookKeeper()
+ .thenAccept(bk -> {
+ bk.asyncOpenLedgerNoRecovery(id,
digestType, password, opencb, null);
+ }).exceptionally(ex -> {
+ log.warn("[{}] Failed to open ledger
{}: {}", managedLedgerName, id, ex);
+ opencb.openComplete(-1, null, null);
+ mlMetaCounter.countDown();
+ return null;
+ });
} else {
log.warn("[{}] Ledger list empty",
managedLedgerName);
mlMetaCounter.countDown();
@@ -217,7 +221,7 @@ public class ManagedLedgerOfflineBacklog {
}
String managedLedgerName = topicName.getPersistenceNamingEncoding();
MetaStore store = factory.getMetaStore();
- BookKeeper bk = factory.getBookKeeper();
+ BookKeeper bk = factory.getBookKeeper().get();
final CountDownLatch allCursorsCounter = new CountDownLatch(1);
final long errorInReadingCursor = -1;
ConcurrentOpenHashMap<String, Long> ledgerRetryMap =
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
index 95923baac02..5ab1a01838d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
@@ -19,9 +19,9 @@
package org.apache.pulsar.broker;
import io.netty.channel.EventLoopGroup;
-import java.io.IOException;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -31,13 +31,16 @@ import
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
* Provider of a new BookKeeper client instance.
*/
public interface BookKeeperClientFactory {
- BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
- Optional<Class<? extends EnsemblePlacementPolicy>>
ensemblePlacementPolicyClass,
- Map<String, Object> ensemblePlacementPolicyProperties)
throws IOException;
+ CompletableFuture<BookKeeper> create(ServiceConfiguration conf,
MetadataStoreExtended store,
+ EventLoopGroup eventLoopGroup,
+ Optional<Class<? extends
EnsemblePlacementPolicy>> policyClass,
+ Map<String, Object>
ensemblePlacementPolicyProperties);
+
+ CompletableFuture<BookKeeper> create(ServiceConfiguration conf,
MetadataStoreExtended store,
+ EventLoopGroup eventLoopGroup,
+ Optional<Class<? extends
EnsemblePlacementPolicy>> policyClass,
+ Map<String, Object>
ensemblePlacementPolicyProperties,
+ StatsLogger statsLogger);
- BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
- Optional<Class<? extends EnsemblePlacementPolicy>>
ensemblePlacementPolicyClass,
- Map<String, Object> ensemblePlacementPolicyProperties,
- StatsLogger statsLogger) throws IOException;
void close();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index e5293cee24e..45299d9ed05 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -29,6 +29,7 @@ import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
@@ -53,19 +54,19 @@ import
org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver;
public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
@Override
- public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended
store,
- EventLoopGroup eventLoopGroup,
- Optional<Class<? extends
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
- Map<String, Object> properties) throws
IOException {
- return create(conf, store, eventLoopGroup,
ensemblePlacementPolicyClass, properties,
+ public CompletableFuture<BookKeeper> create(ServiceConfiguration conf,
MetadataStoreExtended store,
+ EventLoopGroup eventLoopGroup,
+ Optional<Class<? extends
EnsemblePlacementPolicy>> policyClass,
+ Map<String, Object>
properties) {
+ return create(conf, store, eventLoopGroup, policyClass, properties,
NullStatsLogger.INSTANCE);
}
@Override
- public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended
store,
+ public CompletableFuture<BookKeeper> create(ServiceConfiguration conf,
MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
- Map<String, Object> properties, StatsLogger
statsLogger) throws IOException {
+ Map<String, Object> properties, StatsLogger
statsLogger) {
PulsarMetadataClientDriver.init();
ClientConfiguration bkConf = createBkClientConfiguration(store, conf);
@@ -77,11 +78,14 @@ public class BookKeeperClientFactoryImpl implements
BookKeeperClientFactory {
} else {
setDefaultEnsemblePlacementPolicy(bkConf, conf, store);
}
- try {
- return getBookKeeperBuilder(conf, eventLoopGroup, statsLogger,
bkConf).build();
- } catch (InterruptedException | BKException e) {
- throw new IOException(e);
- }
+
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ return getBookKeeperBuilder(conf, eventLoopGroup, statsLogger,
bkConf).build();
+ } catch (InterruptedException | BKException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
}
@VisibleForTesting
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 8861b12f0c1..6ed95f167a1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -18,12 +18,14 @@
*/
package org.apache.pulsar.broker;
+import com.github.benmanes.caffeine.cache.AsyncCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -48,8 +50,8 @@ public class ManagedLedgerClientFactory implements
ManagedLedgerStorage {
private ManagedLedgerFactory managedLedgerFactory;
private BookKeeper defaultBkClient;
- private final Map<EnsemblePlacementPolicyConfig, BookKeeper>
- bkEnsemblePolicyToBkClientMap = new ConcurrentHashMap<>();
+ private final AsyncCache<EnsemblePlacementPolicyConfig, BookKeeper>
+ bkEnsemblePolicyToBkClientMap = Caffeine.newBuilder().buildAsync();
private StatsProvider statsProvider = new NullStatsProvider();
public void initialize(ServiceConfiguration conf, MetadataStoreExtended
metadataStore,
@@ -89,27 +91,20 @@ public class ManagedLedgerClientFactory implements
ManagedLedgerStorage {
StatsLogger statsLogger =
statsProvider.getStatsLogger("pulsar_managedLedger_client");
this.defaultBkClient =
- bookkeeperProvider.create(conf, metadataStore, eventLoopGroup,
Optional.empty(), null, statsLogger);
+ bookkeeperProvider.create(conf, metadataStore, eventLoopGroup,
Optional.empty(), null, statsLogger)
+ .get();
BookkeeperFactoryForCustomEnsemblePlacementPolicy bkFactory = (
EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig)
-> {
- BookKeeper bkClient = null;
- // find or create bk-client in cache for a specific
ensemblePlacementPolicy
- if (ensemblePlacementPolicyConfig != null &&
ensemblePlacementPolicyConfig.getPolicyClass() != null) {
- bkClient =
bkEnsemblePolicyToBkClientMap.computeIfAbsent(ensemblePlacementPolicyConfig,
(key) -> {
- try {
- return bookkeeperProvider.create(conf, metadataStore,
eventLoopGroup,
-
Optional.ofNullable(ensemblePlacementPolicyConfig.getPolicyClass()),
- ensemblePlacementPolicyConfig.getProperties(),
statsLogger);
- } catch (Exception e) {
- log.error("Failed to initialize bk-client for policy
{}, properties {}",
- ensemblePlacementPolicyConfig.getPolicyClass(),
- ensemblePlacementPolicyConfig.getProperties(),
e);
- }
- return this.defaultBkClient;
- });
+ if (ensemblePlacementPolicyConfig == null ||
ensemblePlacementPolicyConfig.getPolicyClass() == null) {
+ return CompletableFuture.completedFuture(defaultBkClient);
}
- return bkClient != null ? bkClient : defaultBkClient;
+
+ // find or create bk-client in cache for a specific
ensemblePlacementPolicy
+ return
bkEnsemblePolicyToBkClientMap.get(ensemblePlacementPolicyConfig,
+ (config, executor) -> bookkeeperProvider.create(conf,
metadataStore, eventLoopGroup,
+
Optional.ofNullable(ensemblePlacementPolicyConfig.getPolicyClass()),
+ ensemblePlacementPolicyConfig.getProperties(),
statsLogger));
};
try {
@@ -136,7 +131,7 @@ public class ManagedLedgerClientFactory implements
ManagedLedgerStorage {
@VisibleForTesting
public Map<EnsemblePlacementPolicyConfig, BookKeeper>
getBkEnsemblePolicyToBookKeeperMap() {
- return bkEnsemblePolicyToBkClientMap;
+ return bkEnsemblePolicyToBkClientMap.synchronous().asMap();
}
@Override
@@ -164,7 +159,7 @@ public class ManagedLedgerClientFactory implements
ManagedLedgerStorage {
// factory, however that might be introducing more unknowns.
log.warn("Encountered exceptions on closing bookkeeper
client", ree);
}
- bkEnsemblePolicyToBkClientMap.forEach((policy, bk) -> {
+
bkEnsemblePolicyToBkClientMap.synchronous().asMap().forEach((policy, bk) -> {
try {
if (bk != null) {
bk.close();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index e99f39b382f..8dcfe8d39a8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -107,7 +107,7 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
pulsar.getIoEventLoopGroup(),
Optional.empty(),
null
- );
+ ).get();
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index acdd906f6b8..99f0249b304 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -110,7 +110,7 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
pulsar.getIoEventLoopGroup(),
Optional.empty(),
null
- );
+ ).join();
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index 3225f7294d5..7d35c2c0f7b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -165,7 +165,7 @@ public class CompactorTool {
new DefaultThreadFactory("compactor-io"));
@Cleanup
- BookKeeper bk = bkClientFactory.create(brokerConfig, store,
eventLoopGroup, Optional.empty(), null);
+ BookKeeper bk = bkClientFactory.create(brokerConfig, store,
eventLoopGroup, Optional.empty(), null).get();
@Cleanup
PulsarClient pulsar = createClient(brokerConfig);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
index 6d65687a501..887e35e2774 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
@@ -19,9 +19,9 @@
package org.apache.pulsar.broker;
import io.netty.channel.EventLoopGroup;
-import java.io.IOException;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
@@ -51,19 +51,19 @@ public class MockedBookKeeperClientFactory implements
BookKeeperClientFactory {
}
@Override
- public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended
store,
- EventLoopGroup eventLoopGroup,
- Optional<Class<? extends
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
- Map<String, Object> properties) throws
IOException {
- return mockedBk;
+ public CompletableFuture<BookKeeper> create(ServiceConfiguration conf,
MetadataStoreExtended store,
+ EventLoopGroup eventLoopGroup,
+ Optional<Class<? extends
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+ Map<String, Object>
properties) {
+ return CompletableFuture.completedFuture(mockedBk);
}
@Override
- public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended
store,
+ public CompletableFuture<BookKeeper> create(ServiceConfiguration conf,
MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
- Map<String, Object> properties, StatsLogger
statsLogger) throws IOException {
- return mockedBk;
+ Map<String, Object> properties, StatsLogger
statsLogger) {
+ return CompletableFuture.completedFuture(mockedBk);
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java
index fd457687323..5f02fd7af48 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.testcontext;
import io.netty.channel.EventLoopGroup;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -39,21 +40,21 @@ class MockBookKeeperClientFactory implements
BookKeeperClientFactory {
}
@Override
- public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended
store,
- EventLoopGroup eventLoopGroup,
- Optional<Class<? extends
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
- Map<String, Object> properties) {
+ public CompletableFuture<BookKeeper> create(ServiceConfiguration conf,
MetadataStoreExtended store,
+ EventLoopGroup eventLoopGroup,
+ Optional<Class<? extends
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+ Map<String, Object> properties) {
// Always return the same instance (so that we don't loose the mock BK
content on broker restart
- return mockBookKeeper;
+ return CompletableFuture.completedFuture(mockBookKeeper);
}
@Override
- public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended
store,
+ public CompletableFuture<BookKeeper> create(ServiceConfiguration conf,
MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> properties, StatsLogger
statsLogger) {
// Always return the same instance (so that we don't loose the mock BK
content on broker restart
- return mockBookKeeper;
+ return CompletableFuture.completedFuture(mockBookKeeper);
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index e955a433ad5..3cca85aa2f1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -163,7 +163,7 @@ public class CompactedTopicTest extends
MockedPulsarServiceBaseTest {
public void testEntryLookup() throws Exception {
@Cleanup
BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
- this.conf, null, null, Optional.empty(), null);
+ this.conf, null, null, Optional.empty(), null).get();
Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData,
Long>>> compactedLedgerData
= buildCompactedLedger(bk, 500);
@@ -219,7 +219,7 @@ public class CompactedTopicTest extends
MockedPulsarServiceBaseTest {
public void testCleanupOldCompactedTopicLedger() throws Exception {
@Cleanup
BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
- this.conf, null, null, Optional.empty(), null);
+ this.conf, null, null, Optional.empty(), null).get();
LedgerHandle oldCompactedLedger = bk.createLedger(1, 1,
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
@@ -849,7 +849,7 @@ public class CompactedTopicTest extends
MockedPulsarServiceBaseTest {
public void
testCompactWithConcurrentGetCompactionHorizonAndCompactedTopicContext() throws
Exception {
@Cleanup
BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
- this.conf, null, null, Optional.empty(), null);
+ this.conf, null, null, Optional.empty(), null).get();
Mockito.doAnswer(invocation -> {
Thread.sleep(1500);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
index 98bf2b819c2..ac1ba6bc814 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
@@ -78,7 +78,7 @@ public class CompactionRetentionTest extends
MockedPulsarServiceBaseTest {
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
- bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null,
Optional.empty(), null);
+ bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null,
Optional.empty(), null).get();
compactor = new TwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index f0010096b1e..081831b0300 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -123,7 +123,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
- bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null,
Optional.empty(), null);
+ bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null,
Optional.empty(), null).get();
compactor = new TwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index debc3dd5e3f..16945a60f5d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -100,7 +100,7 @@ public class CompactorTest extends
MockedPulsarServiceBaseTest {
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
bk = pulsar.getBookKeeperClientFactory().create(
- this.conf, null, null, Optional.empty(), null);
+ this.conf, null, null, Optional.empty(), null).get();
compactor = new TwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
index b3a48f40547..91402168108 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
@@ -155,7 +155,7 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
- bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null,
Optional.empty(), null);
+ bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null,
Optional.empty(), null).get();
schema = Schema.JSON(ServiceUnitStateData.class);
strategy = new ServiceUnitStateCompactionStrategy();
strategy.checkBrokers(false);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
index d84d1ccc9ea..ba77ce5bd9d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
@@ -72,7 +72,7 @@ public class TopicCompactionServiceTest extends
MockedPulsarServiceBaseTest {
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
bk = pulsar.getBookKeeperClientFactory().create(
- this.conf, null, null, Optional.empty(), null);
+ this.conf, null, null, Optional.empty(), null).get();
compactor = new TwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
}