This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ef37890 Allow to configure number of IO and managed ledger threads
(#2412)
ef37890 is described below
commit ef37890331197ee4f40da9fa787ed5241a55cb6a
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Aug 22 10:33:49 2018 -0700
Allow to configure number of IO and managed ledger threads (#2412)
---
conf/broker.conf | 9 +++
conf/standalone.conf | 9 +++
.../mledger/ManagedLedgerFactoryConfig.java | 34 +++--------
.../mledger/impl/ManagedLedgerFactoryImpl.java | 66 ++++++++++------------
.../apache/pulsar/broker/ServiceConfiguration.java | 33 +++++++++++
.../pulsar/broker/ManagedLedgerClientFactory.java | 2 +
.../pulsar/broker/service/BrokerService.java | 2 +-
.../org/apache/pulsar/common/util/FieldParser.java | 7 ++-
8 files changed, 100 insertions(+), 62 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 427d848..5465c9a 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -43,6 +43,9 @@ bindAddress=0.0.0.0
# Hostname or IP address the service advertises to the outside world. If not
set, the value of InetAddress.getLocalHost().getHostName() is used.
advertisedAddress=
+# Number of threads to use for Netty IO. Default is set to 2 *
Runtime.getRuntime().availableProcessors()
+numIOThreads=
+
# Name of the cluster to which this broker belongs to
clusterName=
@@ -311,6 +314,12 @@ managedLedgerDefaultAckQuorum=2
# Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
managedLedgerDigestType=CRC32C
+# Number of threads to be used for managed ledger tasks dispatching
+managedLedgerNumWorkerThreads=8
+
+# Number of threads to be used for managed ledger scheduled tasks
+managedLedgerNumSchedulerThreads=8
+
# Amount of memory to use for caching data payload in managed ledger. This
memory
# is allocated from JVM direct memory and it's shared across all the topics
# running in the same broker
diff --git a/conf/standalone.conf b/conf/standalone.conf
index cc03b95..13bef8a 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -36,6 +36,9 @@ bindAddress=0.0.0.0
# Hostname or IP address the service advertises to the outside world. If not
set, the value of InetAddress.getLocalHost().getHostName() is used.
advertisedAddress=
+# Number of threads to use for Netty IO. Default is set to 2 *
Runtime.getRuntime().availableProcessors()
+numIOThreads=
+
# Name of the cluster to which this broker belongs to
clusterName=standalone
@@ -261,6 +264,12 @@ managedLedgerDefaultAckQuorum=1
# Other possible options are "CRC32C" (which is faster), "MAC" or "DUMMY" (no
checksum).
managedLedgerDigestType=CRC32
+# Number of threads to be used for managed ledger tasks dispatching
+managedLedgerNumWorkerThreads=4
+
+# Number of threads to be used for managed ledger scheduled tasks
+managedLedgerNumSchedulerThreads=4
+
# Amount of memory to use for caching data payload in managed ledger. This
memory
# is allocated from JVM direct memory and it's shared across all the topics
# running in the same broker
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index 6f1f41d..81673f4 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -18,42 +18,26 @@
*/
package org.apache.bookkeeper.mledger;
+import lombok.Data;
+
/**
* Configuration for a {@link ManagedLedgerFactory}.
*/
+@Data
public class ManagedLedgerFactoryConfig {
private static final long MB = 1024 * 1024;
private long maxCacheSize = 128 * MB;
- private double cacheEvictionWatermark = 0.90;
-
- public long getMaxCacheSize() {
- return maxCacheSize;
- }
/**
- *
- * @param maxCacheSize
- * @return
+ * The cache eviction watermark is the percentage of the cache size to
reach when removing entries from the cache.
*/
- public ManagedLedgerFactoryConfig setMaxCacheSize(long maxCacheSize) {
- this.maxCacheSize = maxCacheSize;
- return this;
- }
+ private double cacheEvictionWatermark = 0.90;
- public double getCacheEvictionWatermark() {
- return cacheEvictionWatermark;
- }
+ private int numManagedLedgerWorkerThreads =
Runtime.getRuntime().availableProcessors();
+ private int numManagedLedgerSchedulerThreads =
Runtime.getRuntime().availableProcessors();
- /**
- * The cache eviction watermark is the percentage of the cache size to
reach when removing entries from the cache.
- *
- * @param cacheEvictionWatermark
- * @return
- */
- public ManagedLedgerFactoryConfig setCacheEvictionWatermark(double
cacheEvictionWatermark) {
- this.cacheEvictionWatermark = cacheEvictionWatermark;
- return this;
+ public long getMaxCacheSize() {
+ return maxCacheSize;
}
-
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 6eb33c6..b9f63a8 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -19,7 +19,6 @@
package org.apache.bookkeeper.mledger.impl;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
import com.google.common.base.Predicates;
@@ -53,11 +52,11 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
-import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.CursorInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.MessageRangeInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.PositionInfo;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
@@ -68,10 +67,9 @@ import
org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.util.Futures;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.pulsar.common.util.DateFormatter;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooKeeper.States;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,10 +79,8 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
private final boolean isBookkeeperManaged;
private final ZooKeeper zookeeper;
private final ManagedLedgerFactoryConfig config;
- protected final OrderedScheduler scheduledExecutor =
OrderedScheduler.newSchedulerBuilder().numThreads(16)
- .name("bookkeeper-ml-scheduler").build();
- private final OrderedExecutor orderedExecutor =
OrderedExecutor.newBuilder().numThreads(16)
- .name("bookkeeper-ml-workers").build();
+ protected final OrderedScheduler scheduledExecutor;
+ private final OrderedExecutor orderedExecutor;
protected final ManagedLedgerFactoryMBeanImpl mbean;
@@ -99,34 +95,20 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
this(bkClientConfiguration, new ManagedLedgerFactoryConfig());
}
+ @SuppressWarnings("deprecation")
public ManagedLedgerFactoryImpl(ClientConfiguration bkClientConfiguration,
ManagedLedgerFactoryConfig config)
throws Exception {
- final CountDownLatch counter = new CountDownLatch(1);
- final String zookeeperQuorum =
checkNotNull(bkClientConfiguration.getZkServers());
-
- zookeeper = new ZooKeeper(zookeeperQuorum,
bkClientConfiguration.getZkTimeout(), event -> {
- if
(event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
- log.info("Connected to zookeeper");
- counter.countDown();
- } else {
- log.error("Error connecting to zookeeper {}", event);
- }
- });
-
- if (!counter.await(bkClientConfiguration.getZkTimeout(),
TimeUnit.MILLISECONDS)
- || zookeeper.getState() != States.CONNECTED) {
- throw new ManagedLedgerException("Error connecting to ZooKeeper at
'" + zookeeperQuorum + "'");
- }
-
- this.bookKeeper = new BookKeeper(bkClientConfiguration, zookeeper);
- this.isBookkeeperManaged = true;
-
- this.store = new MetaStoreImplZookeeper(zookeeper, orderedExecutor);
+ this(ZooKeeperClient.newBuilder()
+ .connectString(bkClientConfiguration.getZkServers())
+ .sessionTimeoutMs(bkClientConfiguration.getZkTimeout())
+ .build(), bkClientConfiguration, config);
+ }
- this.config = config;
- this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
- this.entryCacheManager = new EntryCacheManager(this);
- this.statsTask = scheduledExecutor.scheduleAtFixedRate(() ->
refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS);
+ private ManagedLedgerFactoryImpl(ZooKeeper zkc, ClientConfiguration
bkClientConfiguration,
+ ManagedLedgerFactoryConfig config)
+ throws Exception {
+ this(new BookKeeper(bkClientConfiguration, zkc), true /*
isBookkeeperManaged */, zkc,
+ config);
}
public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper
zooKeeper) throws Exception {
@@ -135,9 +117,23 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper
zooKeeper, ManagedLedgerFactoryConfig config)
throws Exception {
+ this(bookKeeper, false /* isBookkeeperManaged */, zooKeeper, config);
+ }
+
+ private ManagedLedgerFactoryImpl(BookKeeper bookKeeper, boolean
isBookkeeperManaged, ZooKeeper zooKeeper,
+ ManagedLedgerFactoryConfig config) throws Exception {
+ scheduledExecutor = OrderedScheduler.newSchedulerBuilder()
+ .numThreads(config.getNumManagedLedgerSchedulerThreads())
+ .name("bookkeeper-ml-scheduler")
+ .build();
+ orderedExecutor = OrderedExecutor.newBuilder()
+ .numThreads(config.getNumManagedLedgerWorkerThreads())
+ .name("bookkeeper-ml-workers")
+ .build();
+
this.bookKeeper = bookKeeper;
- this.isBookkeeperManaged = false;
- this.zookeeper = null;
+ this.isBookkeeperManaged = isBookkeeperManaged;
+ this.zookeeper = isBookkeeperManaged ? zooKeeper : null;
this.store = new MetaStoreImplZookeeper(zooKeeper, orderedExecutor);
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 9efa2a6..8ad0fc8 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -61,6 +61,9 @@ public class ServiceConfiguration implements
PulsarConfiguration {
// Controls which hostname is advertised to the discovery service via
ZooKeeper.
private String advertisedAddress;
+ // Number of threads to use for Netty IO
+ private int numIOThreads = 2 * Runtime.getRuntime().availableProcessors();
+
// Enable the WebSocket API service
private boolean webSocketServiceEnabled = false;
@@ -326,6 +329,12 @@ public class ServiceConfiguration implements
PulsarConfiguration {
private double managedLedgerCacheEvictionWatermark = 0.9f;
// Rate limit the amount of writes per second generated by consumer acking
the messages
private double managedLedgerDefaultMarkDeleteRateLimit = 1.0;
+
+ // Number of threads to be used for managed ledger tasks dispatching
+ private int managedLedgerNumWorkerThreads =
Runtime.getRuntime().availableProcessors();
+ // Number of threads to be used for managed ledger scheduled tasks
+ private int managedLedgerNumSchedulerThreads =
Runtime.getRuntime().availableProcessors();
+
// Max number of entries to append to a ledger before triggering a rollover
// A ledger rollover is triggered on these conditions Either the max
// rollover time has been reached or max entries have been written to the
@@ -579,6 +588,14 @@ public class ServiceConfiguration implements
PulsarConfiguration {
this.advertisedAddress = advertisedAddress;
}
+ public int getNumIOThreads() {
+ return numIOThreads;
+ }
+
+ public void setNumIOThreads(int numIOThreads) {
+ this.numIOThreads = numIOThreads;
+ }
+
public boolean isWebSocketServiceEnabled() {
return webSocketServiceEnabled;
}
@@ -1264,6 +1281,22 @@ public class ServiceConfiguration implements
PulsarConfiguration {
this.managedLedgerMaxUnackedRangesToPersistInZooKeeper =
managedLedgerMaxUnackedRangesToPersistInZookeeper;
}
+ public int getManagedLedgerNumWorkerThreads() {
+ return managedLedgerNumWorkerThreads;
+ }
+
+ public void setManagedLedgerNumWorkerThreads(int
managedLedgerNumWorkerThreads) {
+ this.managedLedgerNumWorkerThreads = managedLedgerNumWorkerThreads;
+ }
+
+ public int getManagedLedgerNumSchedulerThreads() {
+ return managedLedgerNumSchedulerThreads;
+ }
+
+ public void setManagedLedgerNumSchedulerThreads(int
managedLedgerNumSchedulerThreads) {
+ this.managedLedgerNumSchedulerThreads =
managedLedgerNumSchedulerThreads;
+ }
+
public boolean isAutoSkipNonRecoverableData() {
return autoSkipNonRecoverableData;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 7eeb949..3189df2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -44,6 +44,8 @@ public class ManagedLedgerClientFactory implements Closeable {
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new
ManagedLedgerFactoryConfig();
managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() *
1024L * 1024L);
managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
+
managedLedgerFactoryConfig.setNumManagedLedgerWorkerThreads(conf.getManagedLedgerNumWorkerThreads());
+
managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(conf.getManagedLedgerNumSchedulerThreads());
this.managedLedgerFactory = new ManagedLedgerFactoryImpl(bkClient,
zkClient, managedLedgerFactoryConfig);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 42893a7..8620e82 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -209,7 +209,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
.name("broker-topic-workers").build();
final DefaultThreadFactory acceptorThreadFactory = new
DefaultThreadFactory("pulsar-acceptor");
final DefaultThreadFactory workersThreadFactory = new
DefaultThreadFactory("pulsar-io");
- final int numThreads = Runtime.getRuntime().availableProcessors() * 2;
+ final int numThreads = pulsar.getConfiguration().getNumIOThreads();
log.info("Using {} threads for broker service IO", numThreads);
this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1,
acceptorThreadFactory);
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
index e476fc5..d344c30 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
@@ -33,6 +33,8 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+
import io.netty.util.internal.StringUtil;
/**
@@ -134,7 +136,10 @@ public final class FieldParser {
if (properties.containsKey(f.getName())) {
try {
f.setAccessible(true);
- f.set(obj, value((String) properties.get(f.getName()), f));
+ String v = (String) properties.get(f.getName());
+ if (!StringUtils.isBlank(v)) {
+ f.set(obj, value(v, f));
+ }
} catch (Exception e) {
throw new IllegalArgumentException(format("failed to
initialize %s field while setting value %s",
f.getName(), properties.get(f.getName())), e);