This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9c3f2a7 [conf] Add annotations for documenting broker configuration
settings (#3113)
9c3f2a7 is described below
commit 9c3f2a77bb352c073d2dc36094b42dc3c9918c3b
Author: Sijie Guo <[email protected]>
AuthorDate: Thu Dec 20 18:22:29 2018 +0800
[conf] Add annotations for documenting broker configuration settings
(#3113)
*Motivation*
This change is adding annotations to broker configuration for generating
broker configuration file.
---
.../apache/pulsar/broker/ServiceConfiguration.java | 1036 +++++++++++++++-----
.../configuration/PulsarConfigurationLoader.java | 7 +-
2 files changed, 784 insertions(+), 259 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 3939ceb..75ee5d2 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -30,6 +30,7 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.common.configuration.Category;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -41,484 +42,996 @@ import
org.apache.pulsar.common.policies.data.BacklogQuota;
@Setter
public class ServiceConfiguration implements PulsarConfiguration {
+ @Category
+ private static final String CATEGORY_SERVER = "Server";
+ @Category
+ private static final String CATEGORY_STORAGE_BK = "Storage (BookKeeper)";
+ @Category
+ private static final String CATEGORY_STORAGE_ML = "Storage (Managed
Ledger)";
+ @Category
+ private static final String CATEGORY_STORAGE_OFFLOADING = "Storage (Ledger
Offloading)";
+ @Category
+ private static final String CATEGORY_POLICIES = "Policies";
+ @Category
+ private static final String CATEGORY_WEBSOCKET = "WebSocket";
+ @Category
+ private static final String CATEGORY_SCHEMA = "Schema";
+ @Category
+ private static final String CATEGORY_METRICS = "Metrics";
+ @Category
+ private static final String CATEGORY_REPLICATION = "Replication";
+ @Category
+ private static final String CATEGORY_LOAD_BALANCER = "Load Balancer";
+ @Category
+ private static final String CATEGORY_FUNCTIONS = "Functions";
+ @Category
+ private static final String CATEGORY_TLS = "TLS";
+ @Category
+ private static final String CATEGORY_AUTHENTICATION = "Authentication";
+ @Category
+ private static final String CATEGORY_AUTHORIZATION = "Authorization";
+ @Category
+ private static final String CATEGORY_TOKEN_AUTH = "Token Authentication
Provider";
+ @Category
+ private static final String CATEGORY_HTTP = "HTTP";
+
/***** --- pulsar configuration --- ****/
- // Zookeeper quorum connection string
- @FieldContext(required = true)
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ required = true,
+ doc = "The Zookeeper quorum connection string (as a comma-separated
list)"
+ )
private String zookeeperServers;
- // Global Zookeeper quorum connection string
@Deprecated
- @FieldContext(required = false)
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ required = false,
+ deprecated = true,
+ doc = "Global Zookeeper quorum connection string (as a comma-separated
list)."
+ + " Deprecated in favor of using `configurationStoreServers`"
+ )
private String globalZookeeperServers;
- // Configuration Store connection string
- @FieldContext(required = false)
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ required = false,
+ doc = "Configuration store connection string (as a comma-separated
list)"
+ )
private String configurationStoreServers;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "The port for serving binary protobuf requests"
+ )
private Integer brokerServicePort = 6650;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "The port for serving tls secured binary protobuf requests"
+ )
private Integer brokerServicePortTls = null;
- // Port to use to server HTTP request
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "The port for serving http requests"
+ )
private Integer webServicePort = 8080;
- // Port to use to server HTTPS request
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "The port for serving https requests"
+ )
private Integer webServicePortTls = null;
- // Hostname or IP address the service binds on.
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Hostname or IP address the service binds on"
+ )
private String bindAddress = "0.0.0.0";
- // Controls which hostname is advertised to the discovery service via
ZooKeeper.
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Hostname or IP address the service advertises to the outside
world."
+ + " If not set, the value of
`InetAddress.getLocalHost().getHostname()` is used."
+ )
private String advertisedAddress;
- // Number of threads to use for Netty IO
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Number of threads to use for Netty IO."
+ + " Default is set to `2 *
Runtime.getRuntime().availableProcessors()`"
+ )
private int numIOThreads = 2 * Runtime.getRuntime().availableProcessors();
- // Enable the WebSocket API service
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Enable the WebSocket API service in broker"
+ )
private boolean webSocketServiceEnabled = false;
- // Flag to control features that are meant to be used when running in
standalone mode
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Flag indicates whether to run broker in standalone mode"
+ )
private boolean isRunningStandalone = false;
- // Name of the cluster to which this broker belongs to
- @FieldContext(required = true)
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ required = true,
+ doc = "Name of the cluster to which this broker belongs to"
+ )
private String clusterName;
- // Enable cluster's failure-domain which can distribute brokers into
logical region
- @FieldContext(dynamic = true)
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ dynamic = true,
+ doc = "Enable cluster's failure-domain which can distribute brokers
into logical region"
+ )
private boolean failureDomainsEnabled = false;
- // Zookeeper session timeout in milliseconds
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "ZooKeeper session timeout in milliseconds"
+ )
private long zooKeeperSessionTimeoutMillis = 30000;
- // Time to wait for broker graceful shutdown. After this time elapses, the
- // process will be killed
- @FieldContext(dynamic = true)
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ dynamic = true,
+ doc = "Time to wait for broker graceful shutdown. After this time
elapses, the process will be killed"
+ )
private long brokerShutdownTimeoutMs = 60000;
- // Enable backlog quota check. Enforces action on topic when the quota is
- // reached
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Enable backlog quota check. Enforces actions on topic when the
quota is reached"
+ )
private boolean backlogQuotaCheckEnabled = true;
- // How often to check for topics that have reached the quota
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "How often to check for topics that have reached the quota."
+ + " It only takes effects when `backlogQuotaCheckEnabled` is true"
+ )
private int backlogQuotaCheckIntervalInSeconds = 60;
- // Default per-topic backlog quota limit
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Default per-topic backlog quota limit. Increase it if you want
to allow larger msg backlog"
+ )
private long backlogQuotaDefaultLimitGB = 50;
- //Default backlog quota retention policy. Default is producer_request_hold
- //'producer_request_hold' Policy which holds producer's send request until
the resource becomes available (or holding times out)
- //'producer_exception' Policy which throws
javax.jms.ResourceAllocationException to the producer
- //'consumer_backlog_eviction' Policy which evicts the oldest message from
the slowest consumer's backlog
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Default backlog quota retention policy. Default is
producer_request_hold\n\n"
+ + "'producer_request_hold' Policy which holds producer's send
request until the"
+ + "resource becomes available (or holding times out)\n"
+ + "'producer_exception' Policy which throws
javax.jms.ResourceAllocationException to the producer\n"
+ + "'consumer_backlog_eviction' Policy which evicts the oldest
message from the slowest consumer's backlog"
+ )
private BacklogQuota.RetentionPolicy backlogQuotaDefaultRetentionPolicy =
BacklogQuota.RetentionPolicy.producer_request_hold;
- // Enable the deletion of inactive topics
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Enable the deletion of inactive topics"
+ )
private boolean brokerDeleteInactiveTopicsEnabled = true;
- // How often to check for inactive topics
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "How often to check for inactive topics"
+ )
private long brokerDeleteInactiveTopicsFrequencySeconds = 60;
- // How frequently to proactively check and purge expired messages
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "How frequently to proactively check and purge expired messages"
+ )
private int messageExpiryCheckIntervalInMinutes = 5;
- // How long to delay rewinding cursor and dispatching messages when active
consumer is changed
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "How long to delay rewinding cursor and dispatching messages
when active consumer is changed"
+ )
private int activeConsumerFailoverDelayTimeMillis = 1000;
- // How long to delete inactive subscriptions from last consuming
- // When it is 0, inactive subscriptions are not deleted automatically
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "How long to delete inactive subscriptions from last consuming."
+ + " When it is 0, inactive subscriptions are not deleted
automatically"
+ )
private long subscriptionExpirationTimeMinutes = 0;
- // How frequently to proactively check and purge expired subscription
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "How frequently to proactively check and purge expired
subscription"
+ )
private long subscriptionExpiryCheckIntervalInMinutes = 5;
- // Set the default behavior for message deduplication in the broker
- // This can be overridden per-namespace. If enabled, broker will reject
- // messages that were already stored in the topic
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Set the default behavior for message deduplication in the
broker.\n\n"
+ + "This can be overridden per-namespace. If enabled, broker will
reject"
+ + " messages that were already stored in the topic"
+ )
private boolean brokerDeduplicationEnabled = false;
- // Maximum number of producer information that it's going to be
- // persisted for deduplication purposes
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Maximum number of producer information that it's going to be
persisted for deduplication purposes"
+ )
private int brokerDeduplicationMaxNumberOfProducers = 10000;
- // Number of entries after which a dedup info snapshot is taken.
- // A bigger interval will lead to less snapshots being taken though it
would
- // increase the topic recovery time, when the entries published after the
- // snapshot need to be replayed
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Number of entries after which a dedup info snapshot is
taken.\n\n"
+ + "A bigger interval will lead to less snapshots being taken
though it would"
+ + " increase the topic recovery time, when the entries published
after the"
+ + " snapshot need to be replayed"
+ )
private int brokerDeduplicationEntriesInterval = 1000;
- // Time of inactivity after which the broker will discard the
deduplication information
- // relative to a disconnected producer. Default is 6 hours.
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Time of inactivity after which the broker will discard the
deduplication information"
+ + " relative to a disconnected producer. Default is 6 hours.")
private int brokerDeduplicationProducerInactivityTimeoutMinutes = 360;
- // When a namespace is created without specifying the number of bundle,
this
- // value will be used as the default
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "When a namespace is created without specifying the number of
bundle, this"
+ + " value will be used as the default")
private int defaultNumberOfNamespaceBundles = 4;
- // Enable check for minimum allowed client library version
- @FieldContext(dynamic = true)
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ dynamic = true,
+ doc = "Enable check for minimum allowed client library version"
+ )
private boolean clientLibraryVersionCheckEnabled = false;
- // Path for the file used to determine the rotation status for the broker
- // when responding to service discovery health checks
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Path for the file used to determine the rotation status for the
broker"
+ + " when responding to service discovery health checks")
private String statusFilePath;
- // Max number of unacknowledged messages allowed to receive messages by a
consumer on a shared subscription. Broker
- // will stop sending messages to consumer once, this limit reaches until
consumer starts acknowledging messages back
- // and unack count reaches to maxUnackedMessagesPerConsumer/2 Using a
value of 0, is disabling unackedMessage-limit
- // check and consumer can receive messages without any restriction
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Max number of unacknowledged messages allowed to receive
messages by a consumer on"
+ + " a shared subscription.\n\n Broker will stop sending messages
to consumer once,"
+ + " this limit reaches until consumer starts acknowledging
messages back and unack count"
+ + " reaches to `maxUnackedMessagesPerConsumer/2`. Using a value of
0, it is disabling "
+ + " unackedMessage-limit check and consumer can receive messages
without any restriction")
private int maxUnackedMessagesPerConsumer = 50000;
- // Max number of unacknowledged messages allowed per shared subscription.
Broker will stop dispatching messages to
- // all consumers of the subscription once this limit reaches until
consumer starts acknowledging messages back and
- // unack count reaches to limit/2. Using a value of 0, is disabling
unackedMessage-limit
- // check and dispatcher can dispatch messages without any restriction
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Max number of unacknowledged messages allowed per shared
subscription. \n\n"
+ + " Broker will stop dispatching messages to all consumers of the
subscription once this "
+ + " limit reaches until consumer starts acknowledging messages
back and unack count reaches"
+ + " to `limit/2`. Using a value of 0, is disabling
unackedMessage-limit check and dispatcher"
+ + " can dispatch messages without any restriction")
private int maxUnackedMessagesPerSubscription = 4 * 50000;
- // Max number of unacknowledged messages allowed per broker. Once this
limit reaches, broker will stop dispatching
- // messages to all shared subscription which has higher number of unack
messages until subscriptions start
- // acknowledging messages back and unack count reaches to limit/2. Using a
value of 0, is disabling
- // unackedMessage-limit check and broker doesn't block dispatchers
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Max number of unacknowledged messages allowed per broker. \n\n"
+ + " Once this limit reaches, broker will stop dispatching messages
to all shared subscription "
+ + " which has higher number of unack messages until subscriptions
start acknowledging messages "
+ + " back and unack count reaches to `limit/2`. Using a value of 0,
is disabling unackedMessage-limit"
+ + " check and broker doesn't block dispatchers")
private int maxUnackedMessagesPerBroker = 0;
- // Once broker reaches maxUnackedMessagesPerBroker limit, it blocks
subscriptions which has higher unacked messages
- // than this percentage limit and subscription will not receive any new
messages until that subscription acks back
- // limit/2 messages
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Once broker reaches maxUnackedMessagesPerBroker limit, it
blocks subscriptions which has higher "
+ + " unacked messages than this percentage limit and subscription
will not receive any new messages "
+ + " until that subscription acks back `limit/2` messages")
private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16;
- // Too many subscribe requests from a consumer can cause broker rewinding
consumer cursors and loading data from bookies,
- // hence causing high network bandwidth usage
- // When the positive value is set, broker will throttle the subscribe
requests for one consumer.
- // Otherwise, the throttling will be disabled. The default value of this
setting is 0 - throttling is disabled.
- @FieldContext(dynamic = true)
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ dynamic = true,
+ doc = "Too many subscribe requests from a consumer can cause broker
rewinding consumer cursors "
+ + " and loading data from bookies, hence causing high network
bandwidth usage When the positive"
+ + " value is set, broker will throttle the subscribe requests for
one consumer. Otherwise, the"
+ + " throttling will be disabled. The default value of this setting
is 0 - throttling is disabled.")
private int subscribeThrottlingRatePerConsumer = 0;
- // Rate period for {subscribeThrottlingRatePerConsumer}. Default is 30s.
- @FieldContext(minValue = 1, dynamic = true)
+ @FieldContext(
+ minValue = 1,
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Rate period for {subscribeThrottlingRatePerConsumer}. Default
is 30s."
+ )
private int subscribeRatePeriodPerConsumerInSecond = 30;
- // Default number of message dispatching throttling-limit for every topic.
Using a value of 0, is disabling default
- // message dispatch-throttling
- @FieldContext(dynamic = true)
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Default number of message dispatching throttling-limit for
every topic. \n\n"
+ + "Using a value of 0, is disabling default message
dispatch-throttling")
private int dispatchThrottlingRatePerTopicInMsg = 0;
- // Default number of message-bytes dispatching throttling-limit for every
topic. Using a value of 0, is disabling
- // default message-byte dispatch-throttling
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Default number of message-bytes dispatching throttling-limit
for every topic. \n\n"
+ + "Using a value of 0, is disabling default message-byte
dispatch-throttling")
private long dispatchThrottlingRatePerTopicInByte = 0;
- // Default number of message dispatching throttling-limit for a
subscription.
- // Using a value of 0, is disabling default message dispatch-throttling.
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Default number of message dispatching throttling-limit for a
subscription. \n\n"
+ + "Using a value of 0, is disabling default message
dispatch-throttling.")
private int dispatchThrottlingRatePerSubscriptionInMsg = 0;
- // Default number of message-bytes dispatching throttling-limit for a
subscription.
- // Using a value of 0, is disabling default message-byte
dispatch-throttling.
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Default number of message-bytes dispatching throttling-limit
for a subscription. \n\n"
+ + "Using a value of 0, is disabling default message-byte
dispatch-throttling.")
private long dispatchThrottlingRatePerSubscribeInByte = 0;
- // Default dispatch-throttling is disabled for consumers which already
caught-up with published messages and
- // don't have backlog. This enables dispatch-throttling for non-backlog
consumers as well.
- @FieldContext(dynamic = true)
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Default dispatch-throttling is disabled for consumers which
already caught-up with"
+ + " published messages and don't have backlog. This enables
dispatch-throttling for "
+ + " non-backlog consumers as well.")
private boolean dispatchThrottlingOnNonBacklogConsumerEnabled = false;
- // Max number of concurrent lookup request broker allows to throttle heavy
incoming lookup traffic
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_SERVER,
+ doc = "Max number of concurrent lookup request broker allows to
throttle heavy incoming lookup traffic")
private int maxConcurrentLookupRequest = 50000;
- // Max number of concurrent topic loading request broker allows to control
number of zk-operations
- @FieldContext(dynamic = true)
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_SERVER,
+ doc = "Max number of concurrent topic loading request broker allows to
control number of zk-operations"
+ )
private int maxConcurrentTopicLoadRequest = 5000;
- // Max concurrent non-persistent message can be processed per connection
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Max concurrent non-persistent message can be processed per
connection")
private int maxConcurrentNonPersistentMessagePerConnection = 1000;
- // Number of worker threads to serve non-persistent topic
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Number of worker threads to serve non-persistent topic")
private int numWorkerThreadsForNonPersistentTopic =
Runtime.getRuntime().availableProcessors();;
- // Enable broker to load persistent topics
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Enable broker to load persistent topics"
+ )
private boolean enablePersistentTopics = true;
- // Enable broker to load non-persistent topics
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Enable broker to load non-persistent topics"
+ )
private boolean enableNonPersistentTopics = true;
- // Enable to run bookie along with broker
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Enable to run bookie along with broker"
+ )
private boolean enableRunBookieTogether = false;
- // Enable to run bookie autorecovery along with broker
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Enable to run bookie autorecovery along with broker"
+ )
private boolean enableRunBookieAutoRecoveryTogether = false;
- // Max number of producers allowed to connect to topic. Once this limit
reaches, Broker will reject new producers
- // until the number of connected producers decrease.
- // Using a value of 0, is disabling maxProducersPerTopic-limit check.
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Max number of producers allowed to connect to topic. \n\nOnce
this limit reaches,"
+ + " Broker will reject new producers until the number of connected
producers decrease."
+ + " Using a value of 0, is disabling maxProducersPerTopic-limit
check.")
private int maxProducersPerTopic = 0;
- // Max number of consumers allowed to connect to topic. Once this limit
reaches, Broker will reject new consumers
- // until the number of connected consumers decrease.
- // Using a value of 0, is disabling maxConsumersPerTopic-limit check.
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Max number of consumers allowed to connect to topic. \n\nOnce
this limit reaches,"
+ + " Broker will reject new consumers until the number of connected
consumers decrease."
+ + " Using a value of 0, is disabling maxConsumersPerTopic-limit
check.")
private int maxConsumersPerTopic = 0;
- // Max number of consumers allowed to connect to subscription. Once this
limit reaches, Broker will reject new consumers
- // until the number of connected consumers decrease.
- // Using a value of 0, is disabling maxConsumersPerSubscription-limit
check.
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Max number of consumers allowed to connect to subscription.
\n\nOnce this limit reaches,"
+ + " Broker will reject new consumers until the number of connected
consumers decrease."
+ + " Using a value of 0, is disabling
maxConsumersPerSubscription-limit check.")
private int maxConsumersPerSubscription = 0;
/***** --- TLS --- ****/
+ @FieldContext(
+ category = CATEGORY_TLS,
+ doc = "Enable TLS"
+ )
@Deprecated
private boolean tlsEnabled = false;
- // Path for the TLS certificate file
+ @FieldContext(
+ category = CATEGORY_TLS,
+ doc = "Path for the TLS certificate file"
+ )
private String tlsCertificateFilePath;
- // Path for the TLS private key file
+ @FieldContext(
+ category = CATEGORY_TLS,
+ doc = "Path for the TLS private key file"
+ )
private String tlsKeyFilePath;
- // Path for the trusted TLS certificate file
+ @FieldContext(
+ category = CATEGORY_TLS,
+ doc = "Path for the trusted TLS certificate file"
+ )
private String tlsTrustCertsFilePath = "";
- // Accept untrusted TLS certificate from client
+ @FieldContext(
+ category = CATEGORY_TLS,
+ doc = "Accept untrusted TLS certificate from client"
+ )
private boolean tlsAllowInsecureConnection = false;
- // Specify the tls protocols the broker will use to negotiate during TLS
Handshake.
- // Example:- [TLSv1.2, TLSv1.1, TLSv1]
+ @FieldContext(
+ category = CATEGORY_TLS,
+ doc = "Specify the tls protocols the broker will use to negotiate
during TLS Handshake.\n\n"
+ + "Example:- [TLSv1.2, TLSv1.1, TLSv1]"
+ )
private Set<String> tlsProtocols = Sets.newTreeSet();
- // Specify the tls cipher the broker will use to negotiate during TLS
Handshake.
- // Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
+ @FieldContext(
+ category = CATEGORY_TLS,
+ doc = "Specify the tls cipher the broker will use to negotiate during
TLS Handshake.\n\n"
+ + "Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]"
+ )
private Set<String> tlsCiphers = Sets.newTreeSet();
- // Specify whether Client certificates are required for TLS
- // Reject the Connection if the Client Certificate is not trusted.
+ @FieldContext(
+ category = CATEGORY_TLS,
+ doc = "Specify whether Client certificates are required for TLS
Reject.\n"
+ + "the Connection if the Client Certificate is not trusted")
private boolean tlsRequireTrustedClientCertOnConnect = false;
/***** --- Authentication --- ****/
- // Enable authentication
+ @FieldContext(
+ category = CATEGORY_AUTHENTICATION,
+ doc = "Enable authentication"
+ )
private boolean authenticationEnabled = false;
- // Autentication provider name list, which is a list of class names
+ @FieldContext(
+ category = CATEGORY_AUTHENTICATION,
+ doc = "Autentication provider name list, which is a list of class
names"
+ )
private Set<String> authenticationProviders = Sets.newTreeSet();
- // Enforce authorization
+ @FieldContext(
+ category = CATEGORY_AUTHORIZATION,
+ doc = "Enforce authorization"
+ )
private boolean authorizationEnabled = false;
- // Authorization provider fully qualified class-name
+ @FieldContext(
+ category = CATEGORY_AUTHORIZATION,
+ doc = "Authorization provider fully qualified class-name"
+ )
private String authorizationProvider =
PulsarAuthorizationProvider.class.getName();
- // Role names that are treated as "super-user", meaning they will be able
to
- // do all admin operations and publish/consume from all topics
+ @FieldContext(
+ category = CATEGORY_AUTHORIZATION,
+ doc = "Role names that are treated as `super-user`, meaning they will
be able to"
+ + " do all admin operations and publish/consume from all topics"
+ )
private Set<String> superUserRoles = Sets.newTreeSet();
- // Role names that are treated as "proxy roles". If the broker sees a
request with
- // role as proxyRoles - it will demand to see the original client role or
certificate.
+ @FieldContext(
+ category = CATEGORY_AUTHORIZATION,
+ doc = "Role names that are treated as `proxy roles`. \n\nIf the broker
sees"
+ + " a request with role as proxyRoles - it will demand to see the
original"
+ + " client role or certificate.")
private Set<String> proxyRoles = Sets.newTreeSet();
- // If this flag is set then the broker authenticates the original Auth data
- // else it just accepts the originalPrincipal and authorizes it (if
required).
+ @FieldContext(
+ category = CATEGORY_AUTHORIZATION,
+ doc = "If this flag is set then the broker authenticates the original
Auth data"
+ + " else it just accepts the originalPrincipal and authorizes it
(if required)")
private boolean authenticateOriginalAuthData = false;
- // Allow wildcard matching in authorization
- // (wildcard matching only applicable if wildcard-char:
- // * presents at first or last position eg: *.pulsar.service,
pulsar.service.*)
+ @FieldContext(
+ category = CATEGORY_AUTHORIZATION,
+ doc = "Allow wildcard matching in authorization\n\n"
+ + "(wildcard matching only applicable if wildcard-char: * presents
at first"
+ + " or last position eg: *.pulsar.service, pulsar.service.*)")
private boolean authorizationAllowWildcardsMatching = false;
- // Authentication settings of the broker itself. Used when the broker
connects
- // to other brokers, either in same or other clusters. Default uses plugin
which disables authentication
+ @FieldContext(
+ category = CATEGORY_AUTHENTICATION,
+ doc = "Authentication settings of the broker itself. \n\nUsed when the
broker connects"
+ + " to other brokers, either in same or other clusters. Default
uses plugin which disables authentication"
+ )
private String brokerClientAuthenticationPlugin =
"org.apache.pulsar.client.impl.auth.AuthenticationDisabled";
+ @FieldContext(
+ category = CATEGORY_AUTHENTICATION,
+ doc = "Authentication parameters of the authentication plugin the
broker is using to connect to other brokers"
+ )
private String brokerClientAuthenticationParameters = "";
- // Path for the trusted TLS certificate file for outgoing connection to a
server (broker)
+ @FieldContext(
+ category = CATEGORY_AUTHENTICATION,
+ doc = "Path for the trusted TLS certificate file for outgoing
connection to a server (broker)")
private String brokerClientTrustCertsFilePath = "";
- // When this parameter is not empty, unauthenticated users perform as
anonymousUserRole
+ @FieldContext(
+ category = CATEGORY_AUTHORIZATION,
+ doc = "When this parameter is not empty, unauthenticated users perform
as anonymousUserRole"
+ )
private String anonymousUserRole = null;
/**** --- BookKeeper Client --- ****/
- // Authentication plugin to use when connecting to bookies
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Authentication plugin to use when connecting to bookies"
+ )
private String bookkeeperClientAuthenticationPlugin;
- // BookKeeper auth plugin implementatation specifics parameters name and
values
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "BookKeeper auth plugin implementatation specifics parameters
name and values"
+ )
private String bookkeeperClientAuthenticationParametersName;
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Parameters for bookkeeper auth plugin"
+ )
private String bookkeeperClientAuthenticationParameters;
- // Timeout for BK add / read operations
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Timeout for BK add / read operations"
+ )
private long bookkeeperClientTimeoutInSeconds = 30;
- // Speculative reads are initiated if a read request doesn't complete
within
- // a certain time Using a value of 0, is disabling the speculative reads
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Speculative reads are initiated if a read request doesn't
complete within"
+ + " a certain time Using a value of 0, is disabling the
speculative reads")
private int bookkeeperClientSpeculativeReadTimeoutInMillis = 0;
- // Use older Bookkeeper wire protocol with bookie
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Use older Bookkeeper wire protocol with bookie"
+ )
private boolean bookkeeperUseV2WireProtocol = true;
- // Enable bookies health check. Bookies that have more than the configured
- // number of failure within the interval will be quarantined for some time.
- // During this period, new ledgers won't be created on these bookies
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Enable bookies health check. \n\n Bookies that have more than
the configured"
+ + " number of failure within the interval will be quarantined for
some time."
+ + " During this period, new ledgers won't be created on these
bookies")
private boolean bookkeeperClientHealthCheckEnabled = true;
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Bookies health check interval in seconds"
+ )
private long bookkeeperClientHealthCheckIntervalSeconds = 60;
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Bookies health check error threshold per check interval"
+ )
private long bookkeeperClientHealthCheckErrorThresholdPerInterval = 5;
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Bookie health check quarantined time in seconds"
+ )
private long bookkeeperClientHealthCheckQuarantineTimeInSeconds = 1800;
- // Enable rack-aware bookie selection policy. BK will chose bookies from
- // different racks when forming a new bookie ensemble
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Enable rack-aware bookie selection policy. \n\nBK will chose
bookies from"
+ + " different racks when forming a new bookie ensemble")
private boolean bookkeeperClientRackawarePolicyEnabled = true;
- // Enable region-aware bookie selection policy. BK will chose bookies from
- // different regions and racks when forming a new bookie ensemble
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Enable region-aware bookie selection policy. \n\nBK will chose
bookies from"
+ + " different regions and racks when forming a new bookie
ensemble")
private boolean bookkeeperClientRegionawarePolicyEnabled = false;
- // Enable/disable reordering read sequence on reading entries.
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Enable/disable reordering read sequence on reading entries")
private boolean bookkeeperClientReorderReadSequenceEnabled = false;
- // Enable bookie isolation by specifying a list of bookie groups to choose
- // from. Any bookie outside the specified groups will not be used by the
- // broker
- @FieldContext(required = false)
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ required = false,
+ doc = "Enable bookie isolation by specifying a list of bookie groups
to choose from. \n\n"
+ + "Any bookie outside the specified groups will not be used by the
broker")
private String bookkeeperClientIsolationGroups;
/**** --- Managed Ledger --- ****/
- // Number of bookies to use when creating a ledger
- @FieldContext(minValue = 1)
+ @FieldContext(
+ minValue = 1,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Number of bookies to use when creating a ledger"
+ )
private int managedLedgerDefaultEnsembleSize = 1;
- // Number of copies to store for each message
- @FieldContext(minValue = 1)
+ @FieldContext(
+ minValue = 1,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Number of copies to store for each message"
+ )
private int managedLedgerDefaultWriteQuorum = 1;
- // Number of guaranteed copies (acks to wait before write is complete)
- @FieldContext(minValue = 1)
+ @FieldContext(
+ minValue = 1,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Number of guaranteed copies (acks to wait before write is
complete)"
+ )
private int managedLedgerDefaultAckQuorum = 1;
- // Default type of checksum to use when writing to BookKeeper. Default is
"CRC32C"
- // Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
+ //
+ //
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Default type of checksum to use when writing to BookKeeper.
\n\nDefault is `CRC32C`."
+ + " Other possible options are `CRC32`, `MAC` or `DUMMY` (no
checksum)."
+ )
private DigestType managedLedgerDigestType = DigestType.CRC32C;
- // Max number of bookies to use when creating a ledger
- @FieldContext(minValue = 1)
+ @FieldContext(
+ minValue = 1,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Max number of bookies to use when creating a ledger"
+ )
private int managedLedgerMaxEnsembleSize = 5;
- // Max number of copies to store for each message
- @FieldContext(minValue = 1)
+ @FieldContext(
+ minValue = 1,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Max number of copies to store for each message"
+ )
private int managedLedgerMaxWriteQuorum = 5;
- // Max number of guaranteed copies (acks to wait before write is complete)
- @FieldContext(minValue = 1)
+ @FieldContext(
+ minValue = 1,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Max number of guaranteed copies (acks to wait before write is
complete)"
+ )
private int managedLedgerMaxAckQuorum = 5;
- // Amount of memory to use for caching data payload in managed ledger. This
- // memory
- // is allocated from JVM direct memory and it's shared across all the
topics
- // running in the same broker
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Amount of memory to use for caching data payload in managed
ledger. \n\nThis"
+ + " memory is allocated from JVM direct memory and it's shared
across all the topics"
+ + " running in the same broker")
private int managedLedgerCacheSizeMB = 1024;
- // Threshold to which bring down the cache level when eviction is triggered
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Threshold to which bring down the cache level when eviction is
triggered"
+ )
private double managedLedgerCacheEvictionWatermark = 0.9f;
- // Rate limit the amount of writes per second generated by consumer acking
the messages
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Rate limit the amount of writes per second generated by
consumer acking the messages"
+ )
private double managedLedgerDefaultMarkDeleteRateLimit = 1.0;
- // Number of threads to be used for managed ledger tasks dispatching
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Number of threads to be used for managed ledger tasks
dispatching"
+ )
private int managedLedgerNumWorkerThreads =
Runtime.getRuntime().availableProcessors();
- // Number of threads to be used for managed ledger scheduled tasks
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Number of threads to be used for managed ledger scheduled tasks"
+ )
private int managedLedgerNumSchedulerThreads =
Runtime.getRuntime().availableProcessors();
- // Max number of entries to append to a ledger before triggering a rollover
- // A ledger rollover is triggered on these conditions Either the max
- // rollover time has been reached or max entries have been written to the
- // ledged and at least min-time has passed
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Max number of entries to append to a ledger before triggering a
rollover.\n\n"
+ + "A ledger rollover is triggered on these conditions Either the
max"
+ + " rollover time has been reached or max entries have been
written to the"
+ + " ledged and at least min-time has passed")
private int managedLedgerMaxEntriesPerLedger = 50000;
- // Minimum time between ledger rollover for a topic
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Minimum time between ledger rollover for a topic"
+ )
private int managedLedgerMinLedgerRolloverTimeMinutes = 10;
- // Maximum time before forcing a ledger rollover for a topic
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Maximum time before forcing a ledger rollover for a topic"
+ )
private int managedLedgerMaxLedgerRolloverTimeMinutes = 240;
- // Delay between a ledger being successfully offloaded to long term storage
- // and the ledger being deleted from bookkeeper
+ @FieldContext(
+ category = CATEGORY_STORAGE_OFFLOADING,
+ doc = "Delay between a ledger being successfully offloaded to long
term storage,"
+ + " and the ledger being deleted from bookkeeper"
+ )
private long managedLedgerOffloadDeletionLagMs =
TimeUnit.HOURS.toMillis(4);
- // Max number of entries to append to a cursor ledger
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Max number of entries to append to a cursor ledger"
+ )
private int managedLedgerCursorMaxEntriesPerLedger = 50000;
- // Max time before triggering a rollover on a cursor ledger
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Max time before triggering a rollover on a cursor ledger"
+ )
private int managedLedgerCursorRolloverTimeInSeconds = 14400;
- // Max number of "acknowledgment holes" that are going to be persistently
stored.
- // When acknowledging out of order, a consumer will leave holes that are
supposed
- // to be quickly filled by acking all the messages. The information of
which
- // messages are acknowledged is persisted by compressing in "ranges" of
messages
- // that were acknowledged. After the max number of ranges is reached, the
information
- // will only be tracked in memory and messages will be redelivered in case
of
- // crashes.
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Max number of `acknowledgment holes` that are going to be
persistently stored.\n\n"
+ + "When acknowledging out of order, a consumer will leave holes
that are supposed"
+ + " to be quickly filled by acking all the messages. The
information of which"
+ + " messages are acknowledged is persisted by compressing in
`ranges` of messages"
+ + " that were acknowledged. After the max number of ranges is
reached, the information"
+ + " will only be tracked in memory and messages will be
redelivered in case of"
+ + " crashes.")
private int managedLedgerMaxUnackedRangesToPersist = 10000;
- // Max number of "acknowledgment holes" that can be stored in Zookeeper.
If number of unack message range is higher
- // than this limit then broker will persist unacked ranges into bookkeeper
to avoid additional data overhead into
- // zookeeper.
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Max number of `acknowledgment holes` that can be stored in
Zookeeper.\n\n"
+ + "If number of unack message range is higher than this limit then
broker will persist"
+ + " unacked ranges into bookkeeper to avoid additional data
overhead into zookeeper.")
private int managedLedgerMaxUnackedRangesToPersistInZooKeeper = 1000;
- // Skip reading non-recoverable/unreadable data-ledger under
managed-ledger's list. It helps when data-ledgers gets
- // corrupted at bookkeeper and managed-cursor is stuck at that ledger.
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Skip reading non-recoverable/unreadable data-ledger under
managed-ledger's list.\n\n"
+ + " It helps when data-ledgers gets corrupted at bookkeeper and
managed-cursor is stuck at that ledger."
+ )
private boolean autoSkipNonRecoverableData = false;
- // operation timeout while updating managed-ledger metadata.
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "operation timeout while updating managed-ledger metadata."
+ )
private long managedLedgerMetadataOperationsTimeoutSeconds = 60;
/*** --- Load balancer --- ****/
- // Enable load balancer
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Enable load balancer"
+ )
private boolean loadBalancerEnabled = true;
- // load placement strategy[weightedRandomSelection/leastLoadedServer]
(only used by SimpleLoadManagerImpl)
@Deprecated
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ deprecated = true,
+ doc = "load placement
strategy[weightedRandomSelection/leastLoadedServer] (only used by
SimpleLoadManagerImpl)"
+ )
private String loadBalancerPlacementStrategy = "leastLoadedServer"; //
weighted random selection
- // Percentage of change to trigger load report update
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Percentage of change to trigger load report update"
+ )
private int loadBalancerReportUpdateThresholdPercentage = 10;
- // maximum interval to update load report
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "maximum interval to update load report"
+ )
private int loadBalancerReportUpdateMaxIntervalMinutes = 15;
- // Frequency of report to collect
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Frequency of report to collect, in minutes"
+ )
private int loadBalancerHostUsageCheckIntervalMinutes = 1;
- // Enable/disable automatic bundle unloading for load-shedding
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Enable/disable automatic bundle unloading for load-shedding"
+ )
private boolean loadBalancerSheddingEnabled = true;
- // Load shedding interval. Broker periodically checks whether some traffic
should be offload from some over-loaded
- // broker to other under-loaded brokers
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Load shedding interval. \n\nBroker periodically checks whether
some traffic"
+ + " should be offload from some over-loaded broker to other
under-loaded brokers"
+ )
private int loadBalancerSheddingIntervalMinutes = 1;
- // Prevent the same topics to be shed and moved to other broker more that
- // once within this timeframe
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Prevent the same topics to be shed and moved to other broker
more that"
+ + " once within this timeframe"
+ )
private long loadBalancerSheddingGracePeriodMinutes = 30;
- // Usage threshold to determine a broker as under-loaded (only used by
SimpleLoadManagerImpl)
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ deprecated = true,
+ doc = "Usage threshold to determine a broker as under-loaded (only
used by SimpleLoadManagerImpl)"
+ )
@Deprecated
private int loadBalancerBrokerUnderloadedThresholdPercentage = 50;
- // Usage threshold to allocate max number of topics to broker
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Usage threshold to allocate max number of topics to broker"
+ )
private int loadBalancerBrokerMaxTopics = 50000;
- // Usage threshold to determine a broker as over-loaded
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Usage threshold to determine a broker as over-loaded"
+ )
private int loadBalancerBrokerOverloadedThresholdPercentage = 85;
- // Interval to flush dynamic resource quota to ZooKeeper
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Interval to flush dynamic resource quota to ZooKeeper"
+ )
private int loadBalancerResourceQuotaUpdateIntervalMinutes = 15;
- // Usage threshold to determine a broker is having just right level of
load (only used by SimpleLoadManagerImpl)
@Deprecated
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ deprecated = true,
+ doc = "Usage threshold to determine a broker is having just right
level of load"
+ + " (only used by SimpleLoadManagerImpl)"
+ )
private int loadBalancerBrokerComfortLoadLevelPercentage = 65;
- // enable/disable automatic namespace bundle split
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "enable/disable automatic namespace bundle split"
+ )
private boolean loadBalancerAutoBundleSplitEnabled = true;
- // enable/disable automatic unloading of split bundles
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "enable/disable automatic unloading of split bundles"
+ )
private boolean loadBalancerAutoUnloadSplitBundlesEnabled = true;
- // maximum topics in a bundle, otherwise bundle split will be triggered
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "maximum topics in a bundle, otherwise bundle split will be
triggered"
+ )
private int loadBalancerNamespaceBundleMaxTopics = 1000;
- // maximum sessions (producers + consumers) in a bundle, otherwise bundle
split will be triggered
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "maximum sessions (producers + consumers) in a bundle, otherwise
bundle split will be triggered"
+ )
private int loadBalancerNamespaceBundleMaxSessions = 1000;
- // maximum msgRate (in + out) in a bundle, otherwise bundle split will be
triggered
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "maximum msgRate (in + out) in a bundle, otherwise bundle split
will be triggered"
+ )
private int loadBalancerNamespaceBundleMaxMsgRate = 30000;
- // maximum bandwidth (in + out) in a bundle, otherwise bundle split will
be triggered
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "maximum bandwidth (in + out) in a bundle, otherwise bundle
split will be triggered"
+ )
private int loadBalancerNamespaceBundleMaxBandwidthMbytes = 100;
- // maximum number of bundles in a namespace
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "maximum number of bundles in a namespace"
+ )
private int loadBalancerNamespaceMaximumBundles = 128;
- // Name of load manager to use
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Name of load manager to use"
+ )
private String loadManagerClassName =
"org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl";
- // Option to override the auto-detected network interfaces max speed
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Option to override the auto-detected network interfaces max
speed"
+ )
private Double loadBalancerOverrideBrokerNicSpeedGbps;
/**** --- Replication --- ****/
- // Enable replication metrics
+ @FieldContext(
+ category = CATEGORY_REPLICATION,
+ doc = "Enable replication metrics"
+ )
private boolean replicationMetricsEnabled = false;
- // Max number of connections to open for each broker in a remote cluster
- // More connections host-to-host lead to better throughput over
high-latency
- // links.
+ @FieldContext(
+ category = CATEGORY_REPLICATION,
+ doc = "Max number of connections to open for each broker in a remote
cluster.\n\n"
+ + "More connections host-to-host lead to better throughput over
high-latency links"
+ )
private int replicationConnectionsPerBroker = 16;
- @FieldContext(required = false)
- // replicator prefix used for replicator producer name and cursor name
+ @FieldContext(
+ required = false,
+ category = CATEGORY_REPLICATION,
+ doc = "replicator prefix used for replicator producer name and cursor
name"
+ )
private String replicatorPrefix = "pulsar.repl";
- // Replicator producer queue size;
+ @FieldContext(
+ category = CATEGORY_REPLICATION,
+ doc = "Replicator producer queue size"
+ )
private int replicationProducerQueueSize = 1000;
- // @deprecated - Use brokerClientTlsEnabled instead.
@Deprecated
+ @FieldContext(
+ category = CATEGORY_REPLICATION,
+ deprecated = true,
+ doc = "@deprecated - Use brokerClientTlsEnabled instead."
+ )
private boolean replicationTlsEnabled = false;
- // Enable TLS when talking with other brokers in the same cluster (admin
operation) or different clusters (replication)
+ @FieldContext(
+ category = CATEGORY_REPLICATION,
+ doc = "Enable TLS when talking with other brokers in the same cluster
(admin operation)"
+ + " or different clusters (replication)"
+ )
private boolean brokerClientTlsEnabled = false;
- // Default message retention time
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Default message retention time"
+ )
private int defaultRetentionTimeInMinutes = 0;
- // Default retention size
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Default retention size"
+ )
private int defaultRetentionSizeInMB = 0;
- // How often to check pulsar connection is still alive
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "How often to check pulsar connection is still alive"
+ )
private int keepAliveIntervalSeconds = 30;
- // How often broker checks for inactive topics to be deleted (topics with
no subscriptions and no one connected)
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "How often broker checks for inactive topics to be deleted
(topics with no subscriptions and no one connected)"
+ )
private int brokerServicePurgeInactiveFrequencyInSeconds = 60;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "A comma-separated list of namespaces to bootstrap"
+ )
private List<String> bootstrapNamespaces = new ArrayList<String>();
private Properties properties = new Properties();
- // If true, (and ModularLoadManagerImpl is being used), the load manager
will attempt to
- // use only brokers running the latest software version (to minimize
impact to bundles)
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_SERVER,
+ doc = "If true, (and ModularLoadManagerImpl is being used), the load
manager will attempt to "
+ + "use only brokers running the latest software version (to
minimize impact to bundles)"
+ )
private boolean preferLaterVersions = false;
- // Interval between checks to see if topics with compaction policies need
to be compacted
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Interval between checks to see if topics with compaction
policies need to be compacted"
+ )
private int brokerServiceCompactionMonitorIntervalInSeconds = 60;
+ @FieldContext(
+ category = CATEGORY_SCHEMA,
+ doc = "Enforce schema validation on following cases:\n\n"
+ + " - if a producer without a schema attempts to produce to a
topic with schema, the producer will be\n"
+ + " failed to connect. PLEASE be carefully on using this, since
non-java clients don't support schema.\n"
+ + " if you enable this setting, it will cause non-java clients
failed to produce."
+ )
private boolean isSchemaValidationEnforced = false;
+ @FieldContext(
+ category = CATEGORY_SCHEMA,
+ doc = "The schema storage implementation used by this broker"
+ )
private String schemaRegistryStorageClassName =
"org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory";
+ @FieldContext(
+ category = CATEGORY_SCHEMA,
+ doc = "The list compatibility checkers to be used in schema registry"
+ )
private Set<String> schemaRegistryCompatibilityCheckers = Sets.newHashSet(
"org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck",
"org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck"
);
/**** --- WebSocket --- ****/
- // Number of IO threads in Pulsar Client used in WebSocket proxy
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Number of IO threads in Pulsar Client used in WebSocket proxy"
+ )
private int webSocketNumIoThreads =
Runtime.getRuntime().availableProcessors();
- // Number of connections per Broker in Pulsar Client used in WebSocket
proxy
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Number of connections per Broker in Pulsar Client used in
WebSocket proxy"
+ )
private int webSocketConnectionsPerBroker =
Runtime.getRuntime().availableProcessors();
- // Time in milliseconds that idle WebSocket session times out
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Time in milliseconds that idle WebSocket session times out"
+ )
private int webSocketSessionIdleTimeoutMillis = 300000;
/**** --- Metrics --- ****/
- // If true, export topic level metrics otherwise namespace level
+ @FieldContext(
+ category = CATEGORY_METRICS,
+ doc = "If true, export topic level metrics otherwise namespace level"
+ )
private boolean exposeTopicLevelMetricsInPrometheus = true;
+ @FieldContext(
+ category = CATEGORY_METRICS,
+ doc = "If true, export consumer level metrics otherwise namespace
level"
+ )
private boolean exposeConsumerLevelMetricsInPrometheus = false;
/**** --- Functions --- ****/
+ @FieldContext(
+ category = CATEGORY_FUNCTIONS,
+ doc = "Flag indicates enabling or disabling function worker on brokers"
+ )
private boolean functionsWorkerEnabled = false;
/**** --- Broker Web Stats --- ****/
- // If true, export publisher stats when returning topics stats from the
admin rest api
+ @FieldContext(
+ category = CATEGORY_METRICS,
+ doc = "If true, export publisher stats when returning topics stats
from the admin rest api"
+ )
private boolean exposePublisherStats = true;
+ @FieldContext(
+ category = CATEGORY_METRICS,
+ doc = "Stats update frequency in seconds"
+ )
private int statsUpdateFrequencyInSecs = 60;
+ @FieldContext(
+ category = CATEGORY_METRICS,
+ doc = "Stats update initial delay in seconds"
+ )
private int statsUpdateInitialDelayInSecs = 60;
/**** --- Ledger Offloading --- ****/
@@ -526,13 +1039,22 @@ public class ServiceConfiguration implements
PulsarConfiguration {
* NOTES: all implementation related settings should be put in
implementation package.
* only common settings like driver name, io threads can be added
here.
****/
- // The directory to locate offloaders
+ @FieldContext(
+ category = CATEGORY_STORAGE_OFFLOADING,
+ doc = "The directory to locate offloaders"
+ )
private String offloadersDirectory = "./offloaders";
- // Driver to use to offload old data to long term storage
+ @FieldContext(
+ category = CATEGORY_STORAGE_OFFLOADING,
+ doc = "Driver to use to offload old data to long term storage"
+ )
private String managedLedgerOffloadDriver = null;
- // Maximum number of thread pool threads for ledger offloading
+ @FieldContext(
+ category = CATEGORY_STORAGE_OFFLOADING,
+ doc = "Maximum number of thread pool threads for ledger offloading"
+ )
private int managedLedgerOffloadMaxThreads = 2;
/**
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
index df0a1ec..aad9a2f 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
@@ -25,6 +25,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
@@ -180,8 +181,10 @@ public class PulsarConfigurationLoader {
try {
Field convertedConfField =
ServiceConfiguration.class.getDeclaredField(confField.getName());
confField.setAccessible(true);
- convertedConfField.setAccessible(true);
- convertedConfField.set(convertedConf, confField.get(conf));
+ if (!Modifier.isStatic(convertedConfField.getModifiers()))
{
+ convertedConfField.setAccessible(true);
+ convertedConfField.set(convertedConf,
confField.get(conf));
+ }
} catch (NoSuchFieldException e) {
if (!ignoreNonExistMember) {
throw new IllegalArgumentException("Exception caused
while converting configuration: " + e.getMessage());