This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 904c91f4c92 [improve][broker] Able to use separated IO threads for
BookKeeper Client (#16333)
904c91f4c92 is described below
commit 904c91f4c92b4307b8acbce4538199e8430c9bc3
Author: lipenghui <[email protected]>
AuthorDate: Mon Jul 4 09:33:15 2022 +0800
[improve][broker] Able to use separated IO threads for BookKeeper Client
(#16333)
### Motivation
The broker always use the broker IO thread pool as the BookKeeper Client IO
thread pool.
The PR provides the ability to use a separated IO thead pool for BookKeeper
Client.
### Modification
Introduced two new configurations but will not change the default behavior.
```
# Number of BookKeeper client IO threads
# Default is Runtime.getRuntime().availableProcessors() * 2
bookkeeperClientNumIoThreads=
# Use separated IO threads for BookKeeper client
# Default is false, which will use Pulsar IO threads
bookkeeperClientSeparatedIoThreadsEnabled=false
```
---
conf/broker.conf | 8 ++++++++
conf/standalone.conf | 8 ++++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 11 +++++++++++
.../pulsar/broker/BookKeeperClientFactoryImpl.java | 19 +++++++++++++-----
.../broker/BookKeeperClientFactoryImplTest.java | 23 ++++++++++++++++++++++
.../common/naming/ServiceConfigurationTest.java | 16 +++++++++++++++
site2/docs/reference-configuration.md | 2 ++
7 files changed, 82 insertions(+), 5 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 91e20059296..d93811d04a7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -845,6 +845,14 @@ bookkeeperClientTimeoutInSeconds=30
# Default is Runtime.getRuntime().availableProcessors()
bookkeeperClientNumWorkerThreads=
+# Number of BookKeeper client IO threads
+# Default is Runtime.getRuntime().availableProcessors() * 2
+bookkeeperClientNumIoThreads=
+
+# Use separated IO threads for BookKeeper client
+# Default is false, which will use Pulsar IO threads
+bookkeeperClientSeparatedIoThreadsEnabled=false
+
# Speculative reads are initiated if a read request doesn't complete within a
certain time
# Using a value of 0, is disabling the speculative reads
bookkeeperClientSpeculativeReadTimeoutInMillis=0
diff --git a/conf/standalone.conf b/conf/standalone.conf
index a47692ef454..7796e3e41e8 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -546,6 +546,14 @@ bookkeeperClientTimeoutInSeconds=30
# Default is Runtime.getRuntime().availableProcessors()
bookkeeperClientNumWorkerThreads=
+# Number of BookKeeper client IO threads
+# Default is Runtime.getRuntime().availableProcessors() * 2
+bookkeeperClientNumIoThreads=
+
+# Use separated IO threads for BookKeeper client
+# Default is false, which will use Pulsar IO threads
+bookkeeperClientSeparatedIoThreadsEnabled=false
+
# Speculative reads are initiated if a read request doesn't complete within a
certain time
# Using a value of 0, is disabling the speculative reads
bookkeeperClientSpeculativeReadTimeoutInMillis=0
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index fd138abfce6..43ef83f54ed 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1683,6 +1683,17 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private int bookkeeperClientNumWorkerThreads =
Runtime.getRuntime().availableProcessors();
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Number of BookKeeper client IO threads. Default is
Runtime.getRuntime().availableProcessors() * 2"
+ )
+ private int bookkeeperClientNumIoThreads =
Runtime.getRuntime().availableProcessors() * 2;
+
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Use separated IO threads for BookKeeper client. Default is
false, which will use Pulsar IO threads"
+ )
+ private boolean bookkeeperClientSeparatedIoThreadsEnabled = false;
/**** --- Managed Ledger. --- ****/
@FieldContext(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 62fdd20b1ac..f3d69202a08 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -80,16 +80,24 @@ public class BookKeeperClientFactoryImpl implements
BookKeeperClientFactory {
setDefaultEnsemblePlacementPolicy(bkConf, conf, store);
}
try {
- return BookKeeper.forConfig(bkConf)
- .allocator(PulsarByteBufAllocator.DEFAULT)
- .eventLoopGroup(eventLoopGroup)
- .statsLogger(statsLogger)
- .build();
+ return getBookKeeperBuilder(conf, eventLoopGroup, statsLogger,
bkConf).build();
} catch (InterruptedException | BKException e) {
throw new IOException(e);
}
}
+ @VisibleForTesting
+ BookKeeper.Builder getBookKeeperBuilder(ServiceConfiguration conf,
EventLoopGroup eventLoopGroup,
+ StatsLogger statsLogger,
ClientConfiguration bkConf) {
+ BookKeeper.Builder builder = BookKeeper.forConfig(bkConf)
+ .allocator(PulsarByteBufAllocator.DEFAULT)
+ .statsLogger(statsLogger);
+ if (!conf.isBookkeeperClientSeparatedIoThreadsEnabled()) {
+ builder.eventLoopGroup(eventLoopGroup);
+ }
+ return builder;
+ }
+
@VisibleForTesting
ClientConfiguration createBkClientConfiguration(MetadataStoreExtended
store, ServiceConfiguration conf) {
ClientConfiguration bkConf = new ClientConfiguration();
@@ -150,6 +158,7 @@ public class BookKeeperClientFactoryImpl implements
BookKeeperClientFactory {
conf.getBookkeeperClientGetBookieInfoIntervalSeconds(),
TimeUnit.SECONDS);
bkConf.setGetBookieInfoRetryIntervalSeconds(
conf.getBookkeeperClientGetBookieInfoRetryIntervalSeconds(),
TimeUnit.SECONDS);
+ bkConf.setNumIOThreads(conf.getBookkeeperClientNumIoThreads());
PropertiesUtils.filterAndMapProperties(conf.getProperties(),
"bookkeeper_")
.forEach((key, value) -> {
log.info("Applying BookKeeper client configuration setting
{}={}", key, value);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
index 0cef2fc45c7..e26b0aa7561 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
@@ -31,12 +31,16 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import io.netty.channel.EventLoopGroup;
+import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.CachedDNSToSwitchMapping;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.powermock.reflect.Whitebox;
import org.testng.annotations.Test;
/**
@@ -276,4 +280,23 @@ public class BookKeeperClientFactoryImplTest {
}
+ @Test
+ public void testBookKeeperIoThreadsConfiguration() {
+ BookKeeperClientFactoryImpl factory = new
BookKeeperClientFactoryImpl();
+ ServiceConfiguration conf = new ServiceConfiguration();
+
assertEquals(factory.createBkClientConfiguration(mock(MetadataStoreExtended.class),
conf)
+ .getNumIOThreads(), Runtime.getRuntime().availableProcessors()
* 2);
+ conf.setBookkeeperClientNumIoThreads(1);
+
assertEquals(factory.createBkClientConfiguration(mock(MetadataStoreExtended.class),
conf)
+ .getNumIOThreads(), 1);
+ EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
+ BookKeeper.Builder builder = factory.getBookKeeperBuilder(conf,
eventLoopGroup,
+ mock(StatsLogger.class), mock(ClientConfiguration.class));
+ assertEquals(Whitebox.getInternalState(builder, "eventLoopGroup"),
eventLoopGroup);
+ conf.setBookkeeperClientSeparatedIoThreadsEnabled(true);
+ builder = factory.getBookKeeperBuilder(conf, eventLoopGroup,
+ mock(StatsLogger.class), mock(ClientConfiguration.class));
+ assertNull(Whitebox.getInternalState(builder, "eventLoopGroup"));
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index fc3fbc8981b..97202866fb5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -237,4 +237,20 @@ public class ServiceConfigurationTest {
}
}
+
+ @Test
+ public void testBookKeeperClientIoThreads() throws Exception {
+ try (FileInputStream stream = new
FileInputStream("../conf/broker.conf")) {
+ final ServiceConfiguration fileConfig =
PulsarConfigurationLoader.create(stream, ServiceConfiguration.class);
+
assertFalse(fileConfig.isBookkeeperClientSeparatedIoThreadsEnabled());
+ assertEquals(fileConfig.getBookkeeperClientNumIoThreads(),
Runtime.getRuntime().availableProcessors() * 2);
+ }
+ String confFile = "bookkeeperClientNumIoThreads=1\n" +
+ "bookkeeperClientSeparatedIoThreadsEnabled=true\n";
+ try (InputStream stream = new
ByteArrayInputStream(confFile.getBytes())) {
+ final ServiceConfiguration conf =
PulsarConfigurationLoader.create(stream, ServiceConfiguration.class);
+ assertTrue(conf.isBookkeeperClientSeparatedIoThreadsEnabled());
+ assertEquals(conf.getBookkeeperClientNumIoThreads(), 1);
+ }
+ }
}
diff --git a/site2/docs/reference-configuration.md
b/site2/docs/reference-configuration.md
index ab20f653dde..b956314a53c 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -269,6 +269,8 @@ brokerServiceCompactionThresholdInBytes|If the estimated
backlog size is greater
|bookkeeperClientAuthenticationParametersName| BookKeeper auth plugin
implementation specifics parameters name and values ||
|bookkeeperClientAuthenticationParameters|||
|bookkeeperClientNumWorkerThreads| Number of BookKeeper client worker
threads. Default is Runtime.getRuntime().availableProcessors() ||
+|bookkeeperClientNumIoThreads| Number of BookKeeper client IO threads.
Default is Runtime.getRuntime().availableProcessors() * 2 ||
+|bookkeeperClientSeparatedIoThreadsEnabled| Use separated IO threads for
BookKeeper client. Default is false, which will use Pulsar IO threads ||
|bookkeeperClientTimeoutInSeconds| Timeout for BK add / read operations |30|
|bookkeeperClientSpeculativeReadTimeoutInMillis| Speculative reads are
initiated if a read request doesn’t complete within a certain time Using a
value of 0, is disabling the speculative reads |0|
|bookkeeperNumberOfChannelsPerBookie| Number of channels per bookie |16|