merlimat closed pull request #2412: Allow to configure number of IO and managed
ledger threads
URL: https://github.com/apache/incubator-pulsar/pull/2412
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/conf/broker.conf b/conf/broker.conf
index 427d848264..5465c9acaa 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -43,6 +43,9 @@ bindAddress=0.0.0.0
# Hostname or IP address the service advertises to the outside world. If not
set, the value of InetAddress.getLocalHost().getHostName() is used.
advertisedAddress=
+# Number of threads to use for Netty IO. Default is set to 2 *
Runtime.getRuntime().availableProcessors()
+numIOThreads=
+
# Name of the cluster to which this broker belongs to
clusterName=
@@ -311,6 +314,12 @@ managedLedgerDefaultAckQuorum=2
# Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
managedLedgerDigestType=CRC32C
+# Number of threads to be used for managed ledger tasks dispatching
+managedLedgerNumWorkerThreads=8
+
+# Number of threads to be used for managed ledger scheduled tasks
+managedLedgerNumSchedulerThreads=8
+
# Amount of memory to use for caching data payload in managed ledger. This
memory
# is allocated from JVM direct memory and it's shared across all the topics
# running in the same broker
diff --git a/conf/standalone.conf b/conf/standalone.conf
index cc03b958d2..13bef8a372 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -36,6 +36,9 @@ bindAddress=0.0.0.0
# Hostname or IP address the service advertises to the outside world. If not
set, the value of InetAddress.getLocalHost().getHostName() is used.
advertisedAddress=
+# Number of threads to use for Netty IO. Default is set to 2 *
Runtime.getRuntime().availableProcessors()
+numIOThreads=
+
# Name of the cluster to which this broker belongs to
clusterName=standalone
@@ -261,6 +264,12 @@ managedLedgerDefaultAckQuorum=1
# Other possible options are "CRC32C" (which is faster), "MAC" or "DUMMY" (no
checksum).
managedLedgerDigestType=CRC32
+# Number of threads to be used for managed ledger tasks dispatching
+managedLedgerNumWorkerThreads=4
+
+# Number of threads to be used for managed ledger scheduled tasks
+managedLedgerNumSchedulerThreads=4
+
# Amount of memory to use for caching data payload in managed ledger. This
memory
# is allocated from JVM direct memory and it's shared across all the topics
# running in the same broker
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index 6f1f41d4bd..81673f4351 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -18,42 +18,26 @@
*/
package org.apache.bookkeeper.mledger;
+import lombok.Data;
+
/**
* Configuration for a {@link ManagedLedgerFactory}.
*/
+@Data
public class ManagedLedgerFactoryConfig {
private static final long MB = 1024 * 1024;
private long maxCacheSize = 128 * MB;
- private double cacheEvictionWatermark = 0.90;
-
- public long getMaxCacheSize() {
- return maxCacheSize;
- }
/**
- *
- * @param maxCacheSize
- * @return
+ * The cache eviction watermark is the percentage of the cache size to
reach when removing entries from the cache.
*/
- public ManagedLedgerFactoryConfig setMaxCacheSize(long maxCacheSize) {
- this.maxCacheSize = maxCacheSize;
- return this;
- }
+ private double cacheEvictionWatermark = 0.90;
- public double getCacheEvictionWatermark() {
- return cacheEvictionWatermark;
- }
+ private int numManagedLedgerWorkerThreads =
Runtime.getRuntime().availableProcessors();
+ private int numManagedLedgerSchedulerThreads =
Runtime.getRuntime().availableProcessors();
- /**
- * The cache eviction watermark is the percentage of the cache size to
reach when removing entries from the cache.
- *
- * @param cacheEvictionWatermark
- * @return
- */
- public ManagedLedgerFactoryConfig setCacheEvictionWatermark(double
cacheEvictionWatermark) {
- this.cacheEvictionWatermark = cacheEvictionWatermark;
- return this;
+ public long getMaxCacheSize() {
+ return maxCacheSize;
}
-
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 6eb33c684d..b9f63a850b 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -19,7 +19,6 @@
package org.apache.bookkeeper.mledger.impl;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
import com.google.common.base.Predicates;
@@ -53,11 +52,11 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
-import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.CursorInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.MessageRangeInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.PositionInfo;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
@@ -68,10 +67,9 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.util.Futures;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.pulsar.common.util.DateFormatter;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooKeeper.States;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,10 +79,8 @@
private final boolean isBookkeeperManaged;
private final ZooKeeper zookeeper;
private final ManagedLedgerFactoryConfig config;
- protected final OrderedScheduler scheduledExecutor =
OrderedScheduler.newSchedulerBuilder().numThreads(16)
- .name("bookkeeper-ml-scheduler").build();
- private final OrderedExecutor orderedExecutor =
OrderedExecutor.newBuilder().numThreads(16)
- .name("bookkeeper-ml-workers").build();
+ protected final OrderedScheduler scheduledExecutor;
+ private final OrderedExecutor orderedExecutor;
protected final ManagedLedgerFactoryMBeanImpl mbean;
@@ -99,34 +95,20 @@ public ManagedLedgerFactoryImpl(ClientConfiguration
bkClientConfiguration) throw
this(bkClientConfiguration, new ManagedLedgerFactoryConfig());
}
+ @SuppressWarnings("deprecation")
public ManagedLedgerFactoryImpl(ClientConfiguration bkClientConfiguration,
ManagedLedgerFactoryConfig config)
throws Exception {
- final CountDownLatch counter = new CountDownLatch(1);
- final String zookeeperQuorum =
checkNotNull(bkClientConfiguration.getZkServers());
-
- zookeeper = new ZooKeeper(zookeeperQuorum,
bkClientConfiguration.getZkTimeout(), event -> {
- if
(event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
- log.info("Connected to zookeeper");
- counter.countDown();
- } else {
- log.error("Error connecting to zookeeper {}", event);
- }
- });
-
- if (!counter.await(bkClientConfiguration.getZkTimeout(),
TimeUnit.MILLISECONDS)
- || zookeeper.getState() != States.CONNECTED) {
- throw new ManagedLedgerException("Error connecting to ZooKeeper at
'" + zookeeperQuorum + "'");
- }
-
- this.bookKeeper = new BookKeeper(bkClientConfiguration, zookeeper);
- this.isBookkeeperManaged = true;
-
- this.store = new MetaStoreImplZookeeper(zookeeper, orderedExecutor);
+ this(ZooKeeperClient.newBuilder()
+ .connectString(bkClientConfiguration.getZkServers())
+ .sessionTimeoutMs(bkClientConfiguration.getZkTimeout())
+ .build(), bkClientConfiguration, config);
+ }
- this.config = config;
- this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
- this.entryCacheManager = new EntryCacheManager(this);
- this.statsTask = scheduledExecutor.scheduleAtFixedRate(() ->
refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS);
+ private ManagedLedgerFactoryImpl(ZooKeeper zkc, ClientConfiguration
bkClientConfiguration,
+ ManagedLedgerFactoryConfig config)
+ throws Exception {
+ this(new BookKeeper(bkClientConfiguration, zkc), true /*
isBookkeeperManaged */, zkc,
+ config);
}
public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper
zooKeeper) throws Exception {
@@ -135,9 +117,23 @@ public ManagedLedgerFactoryImpl(BookKeeper bookKeeper,
ZooKeeper zooKeeper) thro
public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper
zooKeeper, ManagedLedgerFactoryConfig config)
throws Exception {
+ this(bookKeeper, false /* isBookkeeperManaged */, zooKeeper, config);
+ }
+
+ private ManagedLedgerFactoryImpl(BookKeeper bookKeeper, boolean
isBookkeeperManaged, ZooKeeper zooKeeper,
+ ManagedLedgerFactoryConfig config) throws Exception {
+ scheduledExecutor = OrderedScheduler.newSchedulerBuilder()
+ .numThreads(config.getNumManagedLedgerSchedulerThreads())
+ .name("bookkeeper-ml-scheduler")
+ .build();
+ orderedExecutor = OrderedExecutor.newBuilder()
+ .numThreads(config.getNumManagedLedgerWorkerThreads())
+ .name("bookkeeper-ml-workers")
+ .build();
+
this.bookKeeper = bookKeeper;
- this.isBookkeeperManaged = false;
- this.zookeeper = null;
+ this.isBookkeeperManaged = isBookkeeperManaged;
+ this.zookeeper = isBookkeeperManaged ? zooKeeper : null;
this.store = new MetaStoreImplZookeeper(zooKeeper, orderedExecutor);
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 9efa2a62e0..8ad0fc83da 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -61,6 +61,9 @@
// Controls which hostname is advertised to the discovery service via
ZooKeeper.
private String advertisedAddress;
+ // Number of threads to use for Netty IO
+ private int numIOThreads = 2 * Runtime.getRuntime().availableProcessors();
+
// Enable the WebSocket API service
private boolean webSocketServiceEnabled = false;
@@ -326,6 +329,12 @@
private double managedLedgerCacheEvictionWatermark = 0.9f;
// Rate limit the amount of writes per second generated by consumer acking
the messages
private double managedLedgerDefaultMarkDeleteRateLimit = 1.0;
+
+ // Number of threads to be used for managed ledger tasks dispatching
+ private int managedLedgerNumWorkerThreads =
Runtime.getRuntime().availableProcessors();
+ // Number of threads to be used for managed ledger scheduled tasks
+ private int managedLedgerNumSchedulerThreads =
Runtime.getRuntime().availableProcessors();
+
// Max number of entries to append to a ledger before triggering a rollover
// A ledger rollover is triggered on these conditions Either the max
// rollover time has been reached or max entries have been written to the
@@ -579,6 +588,14 @@ public void setAdvertisedAddress(String advertisedAddress)
{
this.advertisedAddress = advertisedAddress;
}
+ public int getNumIOThreads() {
+ return numIOThreads;
+ }
+
+ public void setNumIOThreads(int numIOThreads) {
+ this.numIOThreads = numIOThreads;
+ }
+
public boolean isWebSocketServiceEnabled() {
return webSocketServiceEnabled;
}
@@ -1264,6 +1281,22 @@ public void
setManagedLedgerMaxUnackedRangesToPersistInZooKeeper(
this.managedLedgerMaxUnackedRangesToPersistInZooKeeper =
managedLedgerMaxUnackedRangesToPersistInZookeeper;
}
+ public int getManagedLedgerNumWorkerThreads() {
+ return managedLedgerNumWorkerThreads;
+ }
+
+ public void setManagedLedgerNumWorkerThreads(int
managedLedgerNumWorkerThreads) {
+ this.managedLedgerNumWorkerThreads = managedLedgerNumWorkerThreads;
+ }
+
+ public int getManagedLedgerNumSchedulerThreads() {
+ return managedLedgerNumSchedulerThreads;
+ }
+
+ public void setManagedLedgerNumSchedulerThreads(int
managedLedgerNumSchedulerThreads) {
+ this.managedLedgerNumSchedulerThreads =
managedLedgerNumSchedulerThreads;
+ }
+
public boolean isAutoSkipNonRecoverableData() {
return autoSkipNonRecoverableData;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 7eeb949799..3189df265d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -44,6 +44,8 @@ public ManagedLedgerClientFactory(ServiceConfiguration conf,
ZooKeeper zkClient,
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new
ManagedLedgerFactoryConfig();
managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() *
1024L * 1024L);
managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
+
managedLedgerFactoryConfig.setNumManagedLedgerWorkerThreads(conf.getManagedLedgerNumWorkerThreads());
+
managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(conf.getManagedLedgerNumSchedulerThreads());
this.managedLedgerFactory = new ManagedLedgerFactoryImpl(bkClient,
zkClient, managedLedgerFactoryConfig);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 42893a7676..8620e82a87 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -209,7 +209,7 @@ public BrokerService(PulsarService pulsar) throws Exception
{
.name("broker-topic-workers").build();
final DefaultThreadFactory acceptorThreadFactory = new
DefaultThreadFactory("pulsar-acceptor");
final DefaultThreadFactory workersThreadFactory = new
DefaultThreadFactory("pulsar-io");
- final int numThreads = Runtime.getRuntime().availableProcessors() * 2;
+ final int numThreads = pulsar.getConfiguration().getNumIOThreads();
log.info("Using {} threads for broker service IO", numThreads);
this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1,
acceptorThreadFactory);
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
index e476fc5e95..d344c30ce5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
@@ -33,6 +33,8 @@
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+
import io.netty.util.internal.StringUtil;
/**
@@ -134,7 +136,10 @@
if (properties.containsKey(f.getName())) {
try {
f.setAccessible(true);
- f.set(obj, value((String) properties.get(f.getName()), f));
+ String v = (String) properties.get(f.getName());
+ if (!StringUtils.isBlank(v)) {
+ f.set(obj, value(v, f));
+ }
} catch (Exception e) {
throw new IllegalArgumentException(format("failed to
initialize %s field while setting value %s",
f.getName(), properties.get(f.getName())), e);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services