merlimat closed pull request #1804: Ensure BookKeeperClientFactory is only
instantiated once in PulsarService
URL: https://github.com/apache/incubator-pulsar/pull/1804
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 0677800c66..c1218540b2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -33,8 +34,8 @@
public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
- private ZooKeeperCache rackawarePolicyZkCache;
- private ZooKeeperCache clientIsolationZkCache;
+ private final AtomicReference<ZooKeeperCache> rackawarePolicyZkCache = new
AtomicReference<>();
+ private final AtomicReference<ZooKeeperCache> clientIsolationZkCache = new
AtomicReference<>();
@Override
public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient)
throws IOException {
@@ -67,9 +68,14 @@ public BookKeeper create(ServiceConfiguration conf,
ZooKeeper zkClient) throws I
bkConf.setEnsemblePlacementPolicy(RackawareEnsemblePlacementPolicy.class);
bkConf.setProperty(RackawareEnsemblePlacementPolicy.REPP_DNS_RESOLVER_CLASS,
ZkBookieRackAffinityMapping.class.getName());
- this.rackawarePolicyZkCache = new ZooKeeperCache(zkClient) {
+
+ ZooKeeperCache zkc = new ZooKeeperCache(zkClient) {
};
- bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE,
this.rackawarePolicyZkCache);
+ if (!rackawarePolicyZkCache.compareAndSet(null, zkc)) {
+ zkc.stop();
+ }
+
+ bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE,
this.rackawarePolicyZkCache.get());
}
if (conf.getBookkeeperClientIsolationGroups() != null &&
!conf.getBookkeeperClientIsolationGroups().isEmpty()) {
@@ -77,8 +83,12 @@ public BookKeeper create(ServiceConfiguration conf,
ZooKeeper zkClient) throws I
bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
conf.getBookkeeperClientIsolationGroups());
if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
- this.clientIsolationZkCache = new ZooKeeperCache(zkClient) {
+ ZooKeeperCache zkc = new ZooKeeperCache(zkClient) {
};
+
+ if (!clientIsolationZkCache.compareAndSet(null, zkc)) {
+ zkc.stop();
+ }
bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE,
this.clientIsolationZkCache);
}
}
@@ -91,11 +101,11 @@ public BookKeeper create(ServiceConfiguration conf,
ZooKeeper zkClient) throws I
}
public void close() {
- if (this.rackawarePolicyZkCache != null) {
- this.rackawarePolicyZkCache.stop();
+ if (this.rackawarePolicyZkCache.get() != null) {
+ this.rackawarePolicyZkCache.get().stop();
}
- if (this.clientIsolationZkCache != null) {
- this.clientIsolationZkCache.stop();
+ if (this.clientIsolationZkCache.get() != null) {
+ this.clientIsolationZkCache.get().stop();
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5f6dbca934..3168982e9b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -320,7 +320,7 @@ public void start() throws PulsarServerException {
// Initialize and start service to access configuration repository.
this.startZkCacheService();
- this.bkClientFactory = getBookKeeperClientFactory();
+ this.bkClientFactory = newBookKeeperClientFactory();
managedLedgerClientFactory = new
ManagedLedgerClientFactory(config, getZkClient(), bkClientFactory);
this.brokerService = new BrokerService(this);
@@ -695,10 +695,14 @@ public ZooKeeperClientFactory getZooKeeperClientFactory()
{
return zkClientFactory;
}
- public BookKeeperClientFactory getBookKeeperClientFactory() {
+ public BookKeeperClientFactory newBookKeeperClientFactory() {
return new BookKeeperClientFactoryImpl();
}
+ public BookKeeperClientFactory getBookKeeperClientFactory() {
+ return bkClientFactory;
+ }
+
protected synchronized ScheduledExecutorService getCompactorExecutor() {
if (this.compactorExecutor == null) {
compactorExecutor = Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("compaction"));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 23592b50db..bc98efa557 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -201,7 +201,7 @@ protected PulsarService startBroker(ServiceConfiguration
conf) throws Exception
protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
// Override default providers with mocked ones
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
-
doReturn(mockBookKeeperClientFactory).when(pulsar).getBookKeeperClientFactory();
+
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new
NamespaceService(pulsar));
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 542c5d77e2..30e455412d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -259,7 +259,7 @@ private void setupEnv(boolean enableFilter, String
minApiVersion, boolean allowU
config.setZookeeperServers("localhost:2181");
pulsar = spy(new PulsarService(config));
doReturn(new
MockedZooKeeperClientFactoryImpl()).when(pulsar).getZooKeeperClientFactory();
- doReturn(new
MockedBookKeeperClientFactory()).when(pulsar).getBookKeeperClientFactory();
+ doReturn(new
MockedBookKeeperClientFactory()).when(pulsar).newBookKeeperClientFactory();
pulsar.start();
try {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services