This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5cff169 [test] add getters and setters to PulsarService &
BrokerService (#4709)
5cff169 is described below
commit 5cff1691fd4c4900befd8ad4e21bec1023a0c936
Author: Sijie Guo <[email protected]>
AuthorDate: Sun Jul 21 14:22:31 2019 +0800
[test] add getters and setters to PulsarService & BrokerService (#4709)
*Motivation*
When using PulsarService or BrokerService for testing, it might require
accessing
the components in PulsarService and BrokerService. This change is adding
setters
and getters to access the components in PulsarService & BrokerService
---
.../org/apache/pulsar/broker/PulsarService.java | 58 ++++++----------------
.../pulsar/broker/service/BrokerService.java | 18 ++++---
2 files changed, 25 insertions(+), 51 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 2dddcd5..0d65b22 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -46,6 +46,9 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -124,6 +127,8 @@ import org.slf4j.LoggerFactory;
* Main class for Pulsar broker service
*/
+@Getter(AccessLevel.PUBLIC)
+@Setter(AccessLevel.PROTECTED)
public class PulsarService implements AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(PulsarService.class);
private ServiceConfiguration config = null;
@@ -351,7 +356,7 @@ public class PulsarService implements AutoCloseable {
if (!config.getBrokerServicePort().isPresent() &&
!config.getBrokerServicePortTls().isPresent()) {
throw new
IllegalArgumentException("brokerServicePort/brokerServicePortTls must be
present");
}
-
+
// Now we are ready to start services
localZooKeeperConnectionProvider = new
LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
config.getZookeeperServers(),
config.getZooKeeperSessionTimeoutMillis());
@@ -459,7 +464,7 @@ public class PulsarService implements AutoCloseable {
+ (config.getBrokerServicePort().isPresent() ? "broker
url= " + brokerServiceUrl : "")
+ (config.getBrokerServicePortTls().isPresent() ? "broker
url= " + brokerServiceUrlTls : "");
LOG.info("messaging service is ready");
-
+
LOG.info("messaging service is ready, {}, cluster={}, configs={}",
bootstrapMessage,
config.getClusterName(),
ReflectionToStringBuilder.toString(config));
} catch (Exception e) {
@@ -470,7 +475,7 @@ public class PulsarService implements AutoCloseable {
}
}
- private void startLeaderElectionService() {
+ protected void startLeaderElectionService() {
this.leaderElectionService = new LeaderElectionService(this, new
LeaderListener() {
@Override
public synchronized void brokerIsTheLeaderNow() {
@@ -504,7 +509,7 @@ public class PulsarService implements AutoCloseable {
leaderElectionService.start();
}
- private void acquireSLANamespace() {
+ protected void acquireSLANamespace() {
try {
// Namespace not created hence no need to unload it
String nsName =
NamespaceService.getSLAMonitorNamespace(getAdvertisedAddress(), config);
@@ -553,7 +558,7 @@ public class PulsarService implements AutoCloseable {
}
}
- private void startZkCacheService() throws PulsarServerException {
+ protected void startZkCacheService() throws PulsarServerException {
LOG.info("starting configuration cache service");
@@ -573,7 +578,7 @@ public class PulsarService implements AutoCloseable {
this.localZkCacheService = new
LocalZooKeeperCacheService(getLocalZkCache(), this.configurationCacheService);
}
- private void startNamespaceService() throws PulsarServerException {
+ protected void startNamespaceService() throws PulsarServerException {
LOG.info("Starting name space service, bootstrap namespaces=" +
config.getBootstrapNamespaces());
@@ -584,7 +589,7 @@ public class PulsarService implements AutoCloseable {
return () -> new NamespaceService(PulsarService.this);
}
- private void startLoadManagementService() throws PulsarServerException {
+ protected void startLoadManagementService() throws PulsarServerException {
LOG.info("Starting load management service ...");
this.loadManager.get().start();
@@ -708,7 +713,7 @@ public class PulsarService implements AutoCloseable {
public ManagedLedgerClientFactory getManagedLedgerClientFactory() {
return managedLedgerClientFactory;
}
-
+
public LedgerOffloader getManagedLedgerOffloader() {
return offloader;
}
@@ -851,7 +856,7 @@ public class PulsarService implements AutoCloseable {
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
}
-
+
// most of the admin request requires to make zk-call so, keep
the max read-timeout based on
// zk-operation timeout
builder.readTimeout(conf.getZooKeeperOperationTimeoutSeconds(),
TimeUnit.SECONDS);
@@ -931,49 +936,14 @@ public class PulsarService implements AutoCloseable {
return String.format("https://%s:%d", host, port);
}
- public String getBindAddress() {
- return bindAddress;
- }
-
- public String getAdvertisedAddress() {
- return advertisedAddress;
- }
-
public String getSafeWebServiceAddress() {
return webServiceAddress != null ? webServiceAddress :
webServiceAddressTls;
}
-
- public String getWebServiceAddress() {
- return webServiceAddress;
- }
-
- public String getWebServiceAddressTls() {
- return webServiceAddressTls;
- }
public String getSafeBrokerServiceUrl() {
return brokerServiceUrl != null ? brokerServiceUrl :
brokerServiceUrlTls;
}
-
- public String getBrokerServiceUrl() {
- return brokerServiceUrl;
- }
- public String getBrokerServiceUrlTls() {
- return brokerServiceUrlTls;
- }
-
- public AtomicReference<LoadManager> getLoadManager() {
- return loadManager;
- }
-
- public String getBrokerVersion() {
- return brokerVersion;
- }
-
- public SchemaRegistryService getSchemaRegistryService() {
- return schemaRegistryService;
- }
private void startWorkerService(AuthenticationService
authenticationService,
AuthorizationService authorizationService)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 077e737..1da2cf8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -63,6 +63,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
@@ -132,10 +135,11 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Getter(AccessLevel.PUBLIC)
+@Setter(AccessLevel.PROTECTED)
public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies> {
private static final Logger log =
LoggerFactory.getLogger(BrokerService.class);
@@ -174,7 +178,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
private DistributedIdGenerator producerNameGenerator;
- private final static String producerNameGeneratorPath =
"/counters/producer-name";
+ public final static String producerNameGeneratorPath =
"/counters/producer-name";
private final BacklogQuotaManager backlogQuotaManager;
@@ -334,7 +338,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
ClientCnxnAspect.registerExecutor(pulsar.getExecutor());
}
- void startStatsUpdater(int statsUpdateInitailDelayInSecs, int
statsUpdateFrequencyInSecs) {
+ protected void startStatsUpdater(int statsUpdateInitailDelayInSecs, int
statsUpdateFrequencyInSecs) {
statsUpdater.scheduleAtFixedRate(safeRun(this::updateRates),
statsUpdateInitailDelayInSecs, statsUpdateFrequencyInSecs,
TimeUnit.SECONDS);
@@ -342,7 +346,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
updateRates();
}
- void startInactivityMonitor() {
+ protected void startInactivityMonitor() {
if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled())
{
int interval =
pulsar().getConfiguration().getBrokerServicePurgeInactiveFrequencyInSeconds();
inactivityMonitor.scheduleAtFixedRate(safeRun(() ->
checkGC(interval)), interval, interval,
@@ -364,13 +368,13 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
}
}
- void startMessageExpiryMonitor() {
+ protected void startMessageExpiryMonitor() {
int interval =
pulsar().getConfiguration().getMessageExpiryCheckIntervalInMinutes();
messageExpiryMonitor.scheduleAtFixedRate(safeRun(this::checkMessageExpiry),
interval, interval,
TimeUnit.MINUTES);
}
- void startCompactionMonitor() {
+ protected void startCompactionMonitor() {
int interval =
pulsar().getConfiguration().getBrokerServiceCompactionMonitorIntervalInSeconds();
if (interval > 0) {
compactionMonitor.scheduleAtFixedRate(safeRun(() ->
checkCompaction()),
@@ -378,7 +382,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
}
}
- void startBacklogQuotaChecker() {
+ protected void startBacklogQuotaChecker() {
if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
final int interval =
pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
log.info("Scheduling a thread to check backlog quota after [{}]
seconds in background", interval);