This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ef37890  Allow to configure number of IO and managed ledger threads 
(#2412)
ef37890 is described below

commit ef37890331197ee4f40da9fa787ed5241a55cb6a
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Aug 22 10:33:49 2018 -0700

    Allow to configure number of IO and managed ledger threads (#2412)
---
 conf/broker.conf                                   |  9 +++
 conf/standalone.conf                               |  9 +++
 .../mledger/ManagedLedgerFactoryConfig.java        | 34 +++--------
 .../mledger/impl/ManagedLedgerFactoryImpl.java     | 66 ++++++++++------------
 .../apache/pulsar/broker/ServiceConfiguration.java | 33 +++++++++++
 .../pulsar/broker/ManagedLedgerClientFactory.java  |  2 +
 .../pulsar/broker/service/BrokerService.java       |  2 +-
 .../org/apache/pulsar/common/util/FieldParser.java |  7 ++-
 8 files changed, 100 insertions(+), 62 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 427d848..5465c9a 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -43,6 +43,9 @@ bindAddress=0.0.0.0
 # Hostname or IP address the service advertises to the outside world. If not 
set, the value of InetAddress.getLocalHost().getHostName() is used.
 advertisedAddress=
 
+# Number of threads to use for Netty IO. Default is set to 2 * 
Runtime.getRuntime().availableProcessors()
+numIOThreads=
+
 # Name of the cluster to which this broker belongs to
 clusterName=
 
@@ -311,6 +314,12 @@ managedLedgerDefaultAckQuorum=2
 # Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
 managedLedgerDigestType=CRC32C
 
+# Number of threads to be used for managed ledger tasks dispatching
+managedLedgerNumWorkerThreads=8
+
+# Number of threads to be used for managed ledger scheduled tasks
+managedLedgerNumSchedulerThreads=8
+
 # Amount of memory to use for caching data payload in managed ledger. This 
memory
 # is allocated from JVM direct memory and it's shared across all the topics
 # running  in the same broker
diff --git a/conf/standalone.conf b/conf/standalone.conf
index cc03b95..13bef8a 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -36,6 +36,9 @@ bindAddress=0.0.0.0
 # Hostname or IP address the service advertises to the outside world. If not 
set, the value of InetAddress.getLocalHost().getHostName() is used.
 advertisedAddress=
 
+# Number of threads to use for Netty IO. Default is set to 2 * 
Runtime.getRuntime().availableProcessors()
+numIOThreads=
+
 # Name of the cluster to which this broker belongs to
 clusterName=standalone
 
@@ -261,6 +264,12 @@ managedLedgerDefaultAckQuorum=1
 # Other possible options are "CRC32C" (which is faster), "MAC" or "DUMMY" (no 
checksum).
 managedLedgerDigestType=CRC32
 
+# Number of threads to be used for managed ledger tasks dispatching
+managedLedgerNumWorkerThreads=4
+
+# Number of threads to be used for managed ledger scheduled tasks
+managedLedgerNumSchedulerThreads=4
+
 # Amount of memory to use for caching data payload in managed ledger. This 
memory
 # is allocated from JVM direct memory and it's shared across all the topics
 # running  in the same broker
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index 6f1f41d..81673f4 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -18,42 +18,26 @@
  */
 package org.apache.bookkeeper.mledger;
 
+import lombok.Data;
+
 /**
  * Configuration for a {@link ManagedLedgerFactory}.
  */
+@Data
 public class ManagedLedgerFactoryConfig {
     private static final long MB = 1024 * 1024;
 
     private long maxCacheSize = 128 * MB;
-    private double cacheEvictionWatermark = 0.90;
-
-    public long getMaxCacheSize() {
-        return maxCacheSize;
-    }
 
     /**
-     *
-     * @param maxCacheSize
-     * @return
+     * The cache eviction watermark is the percentage of the cache size to 
reach when removing entries from the cache.
      */
-    public ManagedLedgerFactoryConfig setMaxCacheSize(long maxCacheSize) {
-        this.maxCacheSize = maxCacheSize;
-        return this;
-    }
+    private double cacheEvictionWatermark = 0.90;
 
-    public double getCacheEvictionWatermark() {
-        return cacheEvictionWatermark;
-    }
+    private int numManagedLedgerWorkerThreads = 
Runtime.getRuntime().availableProcessors();
+    private int numManagedLedgerSchedulerThreads = 
Runtime.getRuntime().availableProcessors();
 
-    /**
-     * The cache eviction watermark is the percentage of the cache size to 
reach when removing entries from the cache.
-     *
-     * @param cacheEvictionWatermark
-     * @return
-     */
-    public ManagedLedgerFactoryConfig setCacheEvictionWatermark(double 
cacheEvictionWatermark) {
-        this.cacheEvictionWatermark = cacheEvictionWatermark;
-        return this;
+    public long getMaxCacheSize() {
+        return maxCacheSize;
     }
-
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 6eb33c6..b9f63a8 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -19,7 +19,6 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
 
 import com.google.common.base.Predicates;
@@ -53,11 +52,11 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
-import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo.CursorInfo;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo.MessageRangeInfo;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo.PositionInfo;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.ReadOnlyCursor;
 import 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
@@ -68,10 +67,9 @@ import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
 import org.apache.bookkeeper.mledger.util.Futures;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.pulsar.common.util.DateFormatter;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooKeeper.States;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,10 +79,8 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
     private final boolean isBookkeeperManaged;
     private final ZooKeeper zookeeper;
     private final ManagedLedgerFactoryConfig config;
-    protected final OrderedScheduler scheduledExecutor = 
OrderedScheduler.newSchedulerBuilder().numThreads(16)
-            .name("bookkeeper-ml-scheduler").build();
-    private final OrderedExecutor orderedExecutor = 
OrderedExecutor.newBuilder().numThreads(16)
-            .name("bookkeeper-ml-workers").build();
+    protected final OrderedScheduler scheduledExecutor;
+    private final OrderedExecutor orderedExecutor;
 
     protected final ManagedLedgerFactoryMBeanImpl mbean;
 
@@ -99,34 +95,20 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         this(bkClientConfiguration, new ManagedLedgerFactoryConfig());
     }
 
+    @SuppressWarnings("deprecation")
     public ManagedLedgerFactoryImpl(ClientConfiguration bkClientConfiguration, 
ManagedLedgerFactoryConfig config)
             throws Exception {
-        final CountDownLatch counter = new CountDownLatch(1);
-        final String zookeeperQuorum = 
checkNotNull(bkClientConfiguration.getZkServers());
-
-        zookeeper = new ZooKeeper(zookeeperQuorum, 
bkClientConfiguration.getZkTimeout(), event -> {
-            if 
(event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
-                log.info("Connected to zookeeper");
-                counter.countDown();
-            } else {
-                log.error("Error connecting to zookeeper {}", event);
-            }
-        });
-
-        if (!counter.await(bkClientConfiguration.getZkTimeout(), 
TimeUnit.MILLISECONDS)
-                || zookeeper.getState() != States.CONNECTED) {
-            throw new ManagedLedgerException("Error connecting to ZooKeeper at 
'" + zookeeperQuorum + "'");
-        }
-
-        this.bookKeeper = new BookKeeper(bkClientConfiguration, zookeeper);
-        this.isBookkeeperManaged = true;
-
-        this.store = new MetaStoreImplZookeeper(zookeeper, orderedExecutor);
+        this(ZooKeeperClient.newBuilder()
+                .connectString(bkClientConfiguration.getZkServers())
+                .sessionTimeoutMs(bkClientConfiguration.getZkTimeout())
+                .build(), bkClientConfiguration, config);
+    }
 
-        this.config = config;
-        this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
-        this.entryCacheManager = new EntryCacheManager(this);
-        this.statsTask = scheduledExecutor.scheduleAtFixedRate(() -> 
refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS);
+    private ManagedLedgerFactoryImpl(ZooKeeper zkc, ClientConfiguration 
bkClientConfiguration,
+            ManagedLedgerFactoryConfig config)
+            throws Exception {
+        this(new BookKeeper(bkClientConfiguration, zkc), true /* 
isBookkeeperManaged */, zkc,
+                config);
     }
 
     public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper 
zooKeeper) throws Exception {
@@ -135,9 +117,23 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
 
     public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper 
zooKeeper, ManagedLedgerFactoryConfig config)
             throws Exception {
+        this(bookKeeper, false /* isBookkeeperManaged */, zooKeeper, config);
+    }
+
+    private ManagedLedgerFactoryImpl(BookKeeper bookKeeper, boolean 
isBookkeeperManaged, ZooKeeper zooKeeper,
+            ManagedLedgerFactoryConfig config) throws Exception {
+        scheduledExecutor = OrderedScheduler.newSchedulerBuilder()
+                .numThreads(config.getNumManagedLedgerSchedulerThreads())
+                .name("bookkeeper-ml-scheduler")
+                .build();
+        orderedExecutor = OrderedExecutor.newBuilder()
+                .numThreads(config.getNumManagedLedgerWorkerThreads())
+                .name("bookkeeper-ml-workers")
+                .build();
+
         this.bookKeeper = bookKeeper;
-        this.isBookkeeperManaged = false;
-        this.zookeeper = null;
+        this.isBookkeeperManaged = isBookkeeperManaged;
+        this.zookeeper = isBookkeeperManaged ? zooKeeper : null;
         this.store = new MetaStoreImplZookeeper(zooKeeper, orderedExecutor);
         this.config = config;
         this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 9efa2a6..8ad0fc8 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -61,6 +61,9 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     // Controls which hostname is advertised to the discovery service via 
ZooKeeper.
     private String advertisedAddress;
 
+    // Number of threads to use for Netty IO
+    private int numIOThreads = 2 * Runtime.getRuntime().availableProcessors();
+
     // Enable the WebSocket API service
     private boolean webSocketServiceEnabled = false;
 
@@ -326,6 +329,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     private double managedLedgerCacheEvictionWatermark = 0.9f;
     // Rate limit the amount of writes per second generated by consumer acking 
the messages
     private double managedLedgerDefaultMarkDeleteRateLimit = 1.0;
+
+    // Number of threads to be used for managed ledger tasks dispatching
+    private int managedLedgerNumWorkerThreads = 
Runtime.getRuntime().availableProcessors();
+    // Number of threads to be used for managed ledger scheduled tasks
+    private int managedLedgerNumSchedulerThreads = 
Runtime.getRuntime().availableProcessors();
+
     // Max number of entries to append to a ledger before triggering a rollover
     // A ledger rollover is triggered on these conditions Either the max
     // rollover time has been reached or max entries have been written to the
@@ -579,6 +588,14 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         this.advertisedAddress = advertisedAddress;
     }
 
+    public int getNumIOThreads() {
+        return numIOThreads;
+    }
+
+    public void setNumIOThreads(int numIOThreads) {
+        this.numIOThreads = numIOThreads;
+    }
+
     public boolean isWebSocketServiceEnabled() {
         return webSocketServiceEnabled;
     }
@@ -1264,6 +1281,22 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         this.managedLedgerMaxUnackedRangesToPersistInZooKeeper = 
managedLedgerMaxUnackedRangesToPersistInZookeeper;
     }
 
+    public int getManagedLedgerNumWorkerThreads() {
+        return managedLedgerNumWorkerThreads;
+    }
+
+    public void setManagedLedgerNumWorkerThreads(int 
managedLedgerNumWorkerThreads) {
+        this.managedLedgerNumWorkerThreads = managedLedgerNumWorkerThreads;
+    }
+
+    public int getManagedLedgerNumSchedulerThreads() {
+        return managedLedgerNumSchedulerThreads;
+    }
+
+    public void setManagedLedgerNumSchedulerThreads(int 
managedLedgerNumSchedulerThreads) {
+        this.managedLedgerNumSchedulerThreads = 
managedLedgerNumSchedulerThreads;
+    }
+
     public boolean isAutoSkipNonRecoverableData() {
         return autoSkipNonRecoverableData;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 7eeb949..3189df2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -44,6 +44,8 @@ public class ManagedLedgerClientFactory implements Closeable {
         ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new 
ManagedLedgerFactoryConfig();
         
managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 
1024L * 1024L);
         
managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
+        
managedLedgerFactoryConfig.setNumManagedLedgerWorkerThreads(conf.getManagedLedgerNumWorkerThreads());
+        
managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(conf.getManagedLedgerNumSchedulerThreads());
 
         this.managedLedgerFactory = new ManagedLedgerFactoryImpl(bkClient, 
zkClient, managedLedgerFactoryConfig);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 42893a7..8620e82 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -209,7 +209,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                 .name("broker-topic-workers").build();
         final DefaultThreadFactory acceptorThreadFactory = new 
DefaultThreadFactory("pulsar-acceptor");
         final DefaultThreadFactory workersThreadFactory = new 
DefaultThreadFactory("pulsar-io");
-        final int numThreads = Runtime.getRuntime().availableProcessors() * 2;
+        final int numThreads = pulsar.getConfiguration().getNumIOThreads();
         log.info("Using {} threads for broker service IO", numThreads);
 
         this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, 
acceptorThreadFactory);
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
index e476fc5..d344c30 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
@@ -33,6 +33,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.StringUtils;
+
 import io.netty.util.internal.StringUtil;
 
 /**
@@ -134,7 +136,10 @@ public final class FieldParser {
             if (properties.containsKey(f.getName())) {
                 try {
                     f.setAccessible(true);
-                    f.set(obj, value((String) properties.get(f.getName()), f));
+                    String v = (String) properties.get(f.getName());
+                    if (!StringUtils.isBlank(v)) {
+                        f.set(obj, value(v, f));
+                    }
                 } catch (Exception e) {
                     throw new IllegalArgumentException(format("failed to 
initialize %s field while setting value %s",
                             f.getName(), properties.get(f.getName())), e);

Reply via email to