This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new b150ded99b8 [improve] [broker] Separate offload read and write thread
pool (#24025)
b150ded99b8 is described below
commit b150ded99b87d794b3fbe30be6ef631d0ec11dd6
Author: Hang Chen <[email protected]>
AuthorDate: Tue Feb 25 19:27:34 2025 -0800
[improve] [broker] Separate offload read and write thread pool (#24025)
---
conf/broker.conf | 3 ++
.../bookkeeper/mledger/LedgerOffloaderFactory.java | 45 ++++++++++++++++++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 6 +++
.../org/apache/pulsar/broker/PulsarService.java | 14 ++++++-
.../common/policies/data/OffloadPolicies.java | 2 +
.../common/policies/data/OffloadPoliciesImpl.java | 9 +++++
.../jcloud/JCloudLedgerOffloaderFactory.java | 14 ++++++-
.../impl/BlobStoreManagedLedgerOffloader.java | 16 +++++---
...obStoreManagedLedgerOffloaderStreamingTest.java | 4 +-
.../impl/BlobStoreManagedLedgerOffloaderTest.java | 4 +-
10 files changed, 105 insertions(+), 12 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 9bd33571c29..08cd7199275 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1696,6 +1696,9 @@ managedLedgerOffloadDriver=
# Maximum number of thread pool threads for ledger offloading
managedLedgerOffloadMaxThreads=2
+# Maximum number of read thread pool threads for ledger offloading
+managedLedgerOffloadReadThreads=2
+
# The extraction directory of the nar package.
# Available for Protocol Handler, Additional Servlets, Entry Filter,
Offloaders, Broker Interceptor.
# Default is System.getProperty("java.io.tmpdir").
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
index 9fbf9b73c05..80135925913 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
@@ -72,6 +72,27 @@ public interface LedgerOffloaderFactory<T extends
LedgerOffloader> extends AutoC
LedgerOffloaderStats offloaderStats)
throws IOException;
+ /**
+ * Create a ledger offloader with the provided configuration,
user-metadata,
+ * scheduler, readExecutor and offloaderStats.
+ *
+ * @param offloadPolicies offload policies
+ * @param userMetadata user metadata
+ * @param scheduler scheduler
+ * @param readExecutor read executor
+ * @param offloaderStats offloaderStats
+ * @return the offloader instance
+ * @throws IOException when fail to create an offloader
+ */
+ default T create(OffloadPoliciesImpl offloadPolicies,
+ Map<String, String> userMetadata,
+ OrderedScheduler scheduler,
+ OrderedScheduler readExecutor,
+ LedgerOffloaderStats offloaderStats)
+ throws IOException {
+ return create(offloadPolicies, userMetadata, scheduler,
offloaderStats);
+ }
+
/**
* Create a ledger offloader with the provided configuration,
user-metadata, schema storage and scheduler.
@@ -112,6 +133,30 @@ public interface LedgerOffloaderFactory<T extends
LedgerOffloader> extends AutoC
return create(offloadPolicies, userMetadata, scheduler,
offloaderStats);
}
+
+ /**
+ * Create a ledger offloader with the provided configuration,
user-metadata, schema storage,
+ * scheduler, readExecutor and offloaderStats.
+ *
+ * @param offloadPolicies offload policies
+ * @param userMetadata user metadata
+ * @param schemaStorage used for schema lookup in offloader
+ * @param scheduler scheduler
+ * @param readExecutor read executor
+ * @param offloaderStats offloaderStats
+ * @return the offloader instance
+ * @throws IOException when fail to create an offloader
+ */
+ default T create(OffloadPoliciesImpl offloadPolicies,
+ Map<String, String> userMetadata,
+ SchemaStorage schemaStorage,
+ OrderedScheduler scheduler,
+ OrderedScheduler readExecutor,
+ LedgerOffloaderStats offloaderStats)
+ throws IOException {
+ return create(offloadPolicies, userMetadata, scheduler, readExecutor,
offloaderStats);
+ }
+
@Override
default void close() throws Exception {
// no-op
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index cf35e5851f4..955baaf73b9 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3187,6 +3187,12 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private int managedLedgerOffloadMaxThreads = 2;
+ @FieldContext(
+ category = CATEGORY_STORAGE_OFFLOADING,
+ doc = "Maximum number of thread pool threads for offloaded ledger
reading"
+ )
+ private int managedLedgerOffloadReadThreads = 2;
+
@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "The directory where nar Extraction of offloaders happens"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 1b4de124d08..cb63d1db71d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -219,6 +219,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private final ScheduledExecutorService loadManagerExecutor;
private ScheduledExecutorService compactorExecutor;
private OrderedScheduler offloaderScheduler;
+ private OrderedScheduler offloaderReadExecutor;
private OffloadersCache offloadersCache = new OffloadersCache();
private LedgerOffloader defaultOffloader;
private LedgerOffloaderStats offloaderStats;
@@ -597,6 +598,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
executorServicesShutdown.shutdown(compactorExecutor);
executorServicesShutdown.shutdown(offloaderScheduler);
+ executorServicesShutdown.shutdown(offloaderReadExecutor);
executorServicesShutdown.shutdown(executor);
executorServicesShutdown.shutdown(orderedExecutor);
@@ -1525,7 +1527,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME.toLowerCase(),
config.getClusterName()
),
- schemaStorage,
getOffloaderScheduler(offloadPolicies), this.offloaderStats);
+ schemaStorage,
getOffloaderScheduler(offloadPolicies),
+ getOffloaderReadScheduler(offloadPolicies),
this.offloaderStats);
} catch (IOException ioe) {
throw new PulsarServerException(ioe.getMessage(),
ioe.getCause());
}
@@ -1597,6 +1600,15 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
return this.offloaderScheduler;
}
+ protected synchronized OrderedScheduler
getOffloaderReadScheduler(OffloadPoliciesImpl offloadPolicies) {
+ if (this.offloaderReadExecutor == null) {
+ this.offloaderReadExecutor = OrderedScheduler.newSchedulerBuilder()
+
.numThreads(offloadPolicies.getManagedLedgerOffloadReadThreads())
+ .name("offloader-read").build();
+ }
+ return this.offloaderReadExecutor;
+ }
+
public PulsarClientImpl createClientImpl(ClientConfigurationData
clientConf)
throws PulsarClientException {
return PulsarClientImpl.builder()
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index 9073de658d4..924153e71ff 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -87,6 +87,8 @@ public interface OffloadPolicies {
Builder managedLedgerOffloadMaxThreads(Integer
managedLedgerOffloadMaxThreads);
+ Builder managedLedgerOffloadReadThreads(Integer
managedLedgerOffloadReadThreads);
+
Builder managedLedgerOffloadPrefetchRounds(Integer
managedLedgerOffloadPrefetchRounds);
Builder managedLedgerOffloadThresholdInBytes(Long
managedLedgerOffloadThresholdInBytes);
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index 6c40aa3f2ed..910075e3870 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -83,6 +83,7 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
public static final int DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES = 128 * 1024 *
1024; // 128MiB
public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;
// 1MiB
public static final int DEFAULT_OFFLOAD_MAX_THREADS = 2;
+ public static final int DEFAULT_OFFLOAD_READ_THREADS = 2;
public static final int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
@@ -109,6 +110,9 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
private Integer managedLedgerOffloadMaxThreads =
DEFAULT_OFFLOAD_MAX_THREADS;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
+ private Integer managedLedgerOffloadReadThreads =
DEFAULT_OFFLOAD_READ_THREADS;
+ @Configuration
+ @JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Integer managedLedgerOffloadPrefetchRounds =
DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
@@ -500,6 +504,11 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
return this;
}
+ public OffloadPoliciesImplBuilder
managedLedgerOffloadReadThreads(Integer managedLedgerOffloadReadThreads) {
+ impl.managedLedgerOffloadReadThreads =
managedLedgerOffloadReadThreads;
+ return this;
+ }
+
public OffloadPoliciesImplBuilder managedLedgerOffloadPrefetchRounds(
Integer managedLedgerOffloadPrefetchRounds) {
impl.managedLedgerOffloadPrefetchRounds =
managedLedgerOffloadPrefetchRounds;
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
index 60363cf8406..ef2db046e3c 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
@@ -54,7 +54,19 @@ public class JCloudLedgerOffloaderFactory implements
LedgerOffloaderFactory<Blob
TieredStorageConfiguration config =
TieredStorageConfiguration.create(offloadPolicies.toProperties());
- return BlobStoreManagedLedgerOffloader.create(config, userMetadata,
scheduler, offloaderStats,
+ return BlobStoreManagedLedgerOffloader.create(config, userMetadata,
scheduler, scheduler, offloaderStats,
+ entryOffsetsCache);
+ }
+
+ @Override
+ public BlobStoreManagedLedgerOffloader create(OffloadPoliciesImpl
offloadPolicies, Map<String, String> userMetadata,
+ OrderedScheduler scheduler,
+ OrderedScheduler
readExecutor,
+ LedgerOffloaderStats
offloaderStats) throws IOException {
+
+ TieredStorageConfiguration config =
+
TieredStorageConfiguration.create(offloadPolicies.toProperties());
+ return BlobStoreManagedLedgerOffloader.create(config, userMetadata,
scheduler, readExecutor, offloaderStats,
entryOffsetsCache);
}
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 9f89bd52a86..48b7787991c 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -97,6 +97,7 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
private static final String MANAGED_LEDGER_NAME = "ManagedLedgerName";
private final OrderedScheduler scheduler;
+ private final OrderedScheduler readExecutor;
private final TieredStorageConfiguration config;
private final Location writeLocation;
@@ -124,18 +125,21 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
public static BlobStoreManagedLedgerOffloader
create(TieredStorageConfiguration config,
Map<String, String>
userMetadata,
OrderedScheduler
scheduler,
+ OrderedScheduler
readExecutor,
LedgerOffloaderStats
offloaderStats,
OffsetsCache
entryOffsetsCache)
throws IOException {
- return new BlobStoreManagedLedgerOffloader(config, scheduler,
userMetadata, offloaderStats, entryOffsetsCache);
+ return new BlobStoreManagedLedgerOffloader(config, scheduler,
readExecutor,
+ userMetadata, offloaderStats, entryOffsetsCache);
}
BlobStoreManagedLedgerOffloader(TieredStorageConfiguration config,
OrderedScheduler scheduler,
+ OrderedScheduler readExecutor,
Map<String, String> userMetadata,
LedgerOffloaderStats offloaderStats,
OffsetsCache entryOffsetsCache) {
-
this.scheduler = scheduler;
+ this.readExecutor = readExecutor;
this.userMetadata = userMetadata;
this.config = config;
this.streamingBlockSize = config.getMinBlockSizeInBytes();
@@ -552,10 +556,10 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
String key = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid);
String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid);
- scheduler.chooseThread(ledgerId).execute(() -> {
+ readExecutor.chooseThread(ledgerId).execute(() -> {
try {
BlobStore readBlobstore =
getBlobStore(config.getBlobStoreLocation());
-
promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
+
promise.complete(BlobStoreBackedReadHandleImpl.open(readExecutor.chooseThread(ledgerId),
readBlobstore,
readBucket, key, indexKey,
DataBlockUtils.VERSION_CHECK,
@@ -587,10 +591,10 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
indexKeys.add(indexKey);
});
- scheduler.chooseThread(ledgerId).execute(() -> {
+ readExecutor.chooseThread(ledgerId).execute(() -> {
try {
BlobStore readBlobstore =
getBlobStore(config.getBlobStoreLocation());
-
promise.complete(BlobStoreBackedReadHandleImplV2.open(scheduler.chooseThread(ledgerId),
+
promise.complete(BlobStoreBackedReadHandleImplV2.open(readExecutor.chooseThread(ledgerId),
readBlobstore,
readBucket, keys, indexKeys,
DataBlockUtils.VERSION_CHECK,
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
index e706e4254cb..67dcb73b9e5 100644
---
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
@@ -82,7 +82,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest
extends BlobStoreManag
mockedConfig = mock(TieredStorageConfiguration.class,
delegatesTo(getConfiguration(bucket, additionalConfig)));
Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use
the REAL blobStore
BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader
- .create(mockedConfig, new HashMap<String, String>(),
scheduler, this.offloaderStats, entryOffsetsCache);
+ .create(mockedConfig, new HashMap<String, String>(),
scheduler, scheduler, this.offloaderStats, entryOffsetsCache);
return offloader;
}
@@ -91,7 +91,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest
extends BlobStoreManag
mockedConfig = mock(TieredStorageConfiguration.class,
delegatesTo(getConfiguration(bucket, additionalConfig)));
Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader
- .create(mockedConfig, new HashMap<String, String>(),
scheduler, this.offloaderStats, entryOffsetsCache);
+ .create(mockedConfig, new HashMap<String, String>(),
scheduler, scheduler, this.offloaderStats, entryOffsetsCache);
return offloader;
}
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index bf6ede896ab..c8bcb67aa2c 100644
---
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -98,7 +98,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerO
private BlobStoreManagedLedgerOffloader getOffloader(String bucket) throws
IOException {
mockedConfig = mock(TieredStorageConfiguration.class,
delegatesTo(getConfiguration(bucket)));
Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use
the REAL blobStore
- BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader.create(mockedConfig, new
HashMap<String,String>(), scheduler, this.offloaderStats,
+ BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader.create(mockedConfig, new
HashMap<String,String>(), scheduler, scheduler, this.offloaderStats,
entryOffsetsCache);
return offloader;
}
@@ -106,7 +106,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerO
private BlobStoreManagedLedgerOffloader getOffloader(String bucket,
BlobStore mockedBlobStore) throws IOException {
mockedConfig = mock(TieredStorageConfiguration.class,
delegatesTo(getConfiguration(bucket)));
Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
- BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader.create(mockedConfig, new
HashMap<String,String>(), scheduler, this.offloaderStats,
+ BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader.create(mockedConfig, new
HashMap<String,String>(), scheduler, scheduler, this.offloaderStats,
entryOffsetsCache);
return offloader;
}