This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new a0ee17124e3 [improve] [broker] Separate offload read and write thread
pool (#24025)
a0ee17124e3 is described below
commit a0ee17124e3481fa48db40aeb5fd1057008cec89
Author: Hang Chen <[email protected]>
AuthorDate: Tue Feb 25 19:27:34 2025 -0800
[improve] [broker] Separate offload read and write thread pool (#24025)
(cherry picked from commit b407a219c85fa276ae88481e5f93a8ad63c09068)
---
conf/broker.conf | 3 ++
.../bookkeeper/mledger/LedgerOffloaderFactory.java | 45 ++++++++++++++++++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 6 +++
.../org/apache/pulsar/broker/PulsarService.java | 14 ++++++-
.../common/policies/data/OffloadPolicies.java | 2 +
.../common/policies/data/OffloadPoliciesImpl.java | 9 +++++
.../jcloud/JCloudLedgerOffloaderFactory.java | 14 ++++++-
.../impl/BlobStoreManagedLedgerOffloader.java | 16 +++++---
...obStoreManagedLedgerOffloaderStreamingTest.java | 4 +-
.../impl/BlobStoreManagedLedgerOffloaderTest.java | 4 +-
10 files changed, 105 insertions(+), 12 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 2a7befafed3..ec561141ac0 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1817,6 +1817,9 @@ managedLedgerOffloadDriver=
# Maximum number of thread pool threads for ledger offloading
managedLedgerOffloadMaxThreads=2
+# Maximum number of read thread pool threads for ledger offloading
+managedLedgerOffloadReadThreads=2
+
# The extraction directory of the nar package.
# Available for Protocol Handler, Additional Servlets, Entry Filter,
Offloaders, Broker Interceptor.
# Default is System.getProperty("java.io.tmpdir").
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
index 9fbf9b73c05..80135925913 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
@@ -72,6 +72,27 @@ public interface LedgerOffloaderFactory<T extends
LedgerOffloader> extends AutoC
LedgerOffloaderStats offloaderStats)
throws IOException;
+ /**
+ * Create a ledger offloader with the provided configuration,
user-metadata,
+ * scheduler, readExecutor and offloaderStats.
+ *
+ * @param offloadPolicies offload policies
+ * @param userMetadata user metadata
+ * @param scheduler scheduler
+ * @param readExecutor read executor
+ * @param offloaderStats offloaderStats
+ * @return the offloader instance
+ * @throws IOException when fail to create an offloader
+ */
+ default T create(OffloadPoliciesImpl offloadPolicies,
+ Map<String, String> userMetadata,
+ OrderedScheduler scheduler,
+ OrderedScheduler readExecutor,
+ LedgerOffloaderStats offloaderStats)
+ throws IOException {
+ return create(offloadPolicies, userMetadata, scheduler,
offloaderStats);
+ }
+
/**
* Create a ledger offloader with the provided configuration,
user-metadata, schema storage and scheduler.
@@ -112,6 +133,30 @@ public interface LedgerOffloaderFactory<T extends
LedgerOffloader> extends AutoC
return create(offloadPolicies, userMetadata, scheduler,
offloaderStats);
}
+
+ /**
+ * Create a ledger offloader with the provided configuration,
user-metadata, schema storage,
+ * scheduler, readExecutor and offloaderStats.
+ *
+ * @param offloadPolicies offload policies
+ * @param userMetadata user metadata
+ * @param schemaStorage used for schema lookup in offloader
+ * @param scheduler scheduler
+ * @param readExecutor read executor
+ * @param offloaderStats offloaderStats
+ * @return the offloader instance
+ * @throws IOException when fail to create an offloader
+ */
+ default T create(OffloadPoliciesImpl offloadPolicies,
+ Map<String, String> userMetadata,
+ SchemaStorage schemaStorage,
+ OrderedScheduler scheduler,
+ OrderedScheduler readExecutor,
+ LedgerOffloaderStats offloaderStats)
+ throws IOException {
+ return create(offloadPolicies, userMetadata, scheduler, readExecutor,
offloaderStats);
+ }
+
@Override
default void close() throws Exception {
// no-op
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c8d65cb46af..8a28e3e7377 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3418,6 +3418,12 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private int managedLedgerOffloadMaxThreads = 2;
+ @FieldContext(
+ category = CATEGORY_STORAGE_OFFLOADING,
+ doc = "Maximum number of thread pool threads for offloaded ledger
reading"
+ )
+ private int managedLedgerOffloadReadThreads = 2;
+
@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "The directory where nar Extraction of offloaders happens"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 272b5222580..5fd63502f8e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -232,6 +232,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private final ScheduledExecutorService loadManagerExecutor;
private ScheduledExecutorService compactorExecutor;
private OrderedScheduler offloaderScheduler;
+ private OrderedScheduler offloaderReadExecutor;
private OffloadersCache offloadersCache = new OffloadersCache();
private LedgerOffloader defaultOffloader;
private LedgerOffloaderStats offloaderStats;
@@ -643,6 +644,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
executorServicesShutdown.shutdown(compactorExecutor);
executorServicesShutdown.shutdown(offloaderScheduler);
+ executorServicesShutdown.shutdown(offloaderReadExecutor);
executorServicesShutdown.shutdown(executor);
executorServicesShutdown.shutdown(orderedExecutor);
@@ -1608,7 +1610,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME.toLowerCase(),
config.getClusterName()
),
- schemaStorage,
getOffloaderScheduler(offloadPolicies), this.offloaderStats);
+ schemaStorage,
getOffloaderScheduler(offloadPolicies),
+ getOffloaderReadScheduler(offloadPolicies),
this.offloaderStats);
} catch (IOException ioe) {
throw new PulsarServerException(ioe.getMessage(),
ioe.getCause());
}
@@ -1680,6 +1683,15 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
return this.offloaderScheduler;
}
+ protected synchronized OrderedScheduler
getOffloaderReadScheduler(OffloadPoliciesImpl offloadPolicies) {
+ if (this.offloaderReadExecutor == null) {
+ this.offloaderReadExecutor = OrderedScheduler.newSchedulerBuilder()
+
.numThreads(offloadPolicies.getManagedLedgerOffloadReadThreads())
+ .name("offloader-read").build();
+ }
+ return this.offloaderReadExecutor;
+ }
+
public PulsarClientImpl createClientImpl(ClientConfigurationData conf)
throws PulsarClientException {
return createClientImpl(conf, null);
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index 9073de658d4..924153e71ff 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -87,6 +87,8 @@ public interface OffloadPolicies {
Builder managedLedgerOffloadMaxThreads(Integer
managedLedgerOffloadMaxThreads);
+ Builder managedLedgerOffloadReadThreads(Integer
managedLedgerOffloadReadThreads);
+
Builder managedLedgerOffloadPrefetchRounds(Integer
managedLedgerOffloadPrefetchRounds);
Builder managedLedgerOffloadThresholdInBytes(Long
managedLedgerOffloadThresholdInBytes);
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index 37059d105e1..22e72d7cde8 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -83,6 +83,7 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
public static final int DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES = 128 * 1024 *
1024; // 128MiB
public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;
// 1MiB
public static final int DEFAULT_OFFLOAD_MAX_THREADS = 2;
+ public static final int DEFAULT_OFFLOAD_READ_THREADS = 2;
public static final int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
@@ -108,6 +109,9 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
private Integer managedLedgerOffloadMaxThreads =
DEFAULT_OFFLOAD_MAX_THREADS;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
+ private Integer managedLedgerOffloadReadThreads =
DEFAULT_OFFLOAD_READ_THREADS;
+ @Configuration
+ @JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Integer managedLedgerOffloadPrefetchRounds =
DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
@@ -499,6 +503,11 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
return this;
}
+ public OffloadPoliciesImplBuilder
managedLedgerOffloadReadThreads(Integer managedLedgerOffloadReadThreads) {
+ impl.managedLedgerOffloadReadThreads =
managedLedgerOffloadReadThreads;
+ return this;
+ }
+
public OffloadPoliciesImplBuilder managedLedgerOffloadPrefetchRounds(
Integer managedLedgerOffloadPrefetchRounds) {
impl.managedLedgerOffloadPrefetchRounds =
managedLedgerOffloadPrefetchRounds;
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
index 60363cf8406..ef2db046e3c 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
@@ -54,7 +54,19 @@ public class JCloudLedgerOffloaderFactory implements
LedgerOffloaderFactory<Blob
TieredStorageConfiguration config =
TieredStorageConfiguration.create(offloadPolicies.toProperties());
- return BlobStoreManagedLedgerOffloader.create(config, userMetadata,
scheduler, offloaderStats,
+ return BlobStoreManagedLedgerOffloader.create(config, userMetadata,
scheduler, scheduler, offloaderStats,
+ entryOffsetsCache);
+ }
+
+ @Override
+ public BlobStoreManagedLedgerOffloader create(OffloadPoliciesImpl
offloadPolicies, Map<String, String> userMetadata,
+ OrderedScheduler scheduler,
+ OrderedScheduler
readExecutor,
+ LedgerOffloaderStats
offloaderStats) throws IOException {
+
+ TieredStorageConfiguration config =
+
TieredStorageConfiguration.create(offloadPolicies.toProperties());
+ return BlobStoreManagedLedgerOffloader.create(config, userMetadata,
scheduler, readExecutor, offloaderStats,
entryOffsetsCache);
}
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 9f16ce04da5..33bbc49ee22 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -97,6 +97,7 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
private static final String MANAGED_LEDGER_NAME = "ManagedLedgerName";
private final OrderedScheduler scheduler;
+ private final OrderedScheduler readExecutor;
private final TieredStorageConfiguration config;
private final OffloadPolicies policies;
private final Location writeLocation;
@@ -125,18 +126,21 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
public static BlobStoreManagedLedgerOffloader
create(TieredStorageConfiguration config,
Map<String, String>
userMetadata,
OrderedScheduler
scheduler,
+ OrderedScheduler
readExecutor,
LedgerOffloaderStats
offloaderStats,
OffsetsCache
entryOffsetsCache)
throws IOException {
- return new BlobStoreManagedLedgerOffloader(config, scheduler,
userMetadata, offloaderStats, entryOffsetsCache);
+ return new BlobStoreManagedLedgerOffloader(config, scheduler,
readExecutor,
+ userMetadata, offloaderStats, entryOffsetsCache);
}
BlobStoreManagedLedgerOffloader(TieredStorageConfiguration config,
OrderedScheduler scheduler,
+ OrderedScheduler readExecutor,
Map<String, String> userMetadata,
LedgerOffloaderStats offloaderStats,
OffsetsCache entryOffsetsCache) {
-
this.scheduler = scheduler;
+ this.readExecutor = readExecutor;
this.userMetadata = userMetadata;
this.config = config;
Properties properties = new Properties();
@@ -561,10 +565,10 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
String key = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid);
String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid);
- scheduler.chooseThread(ledgerId).execute(() -> {
+ readExecutor.chooseThread(ledgerId).execute(() -> {
try {
BlobStore readBlobstore =
getBlobStore(config.getBlobStoreLocation());
-
promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
+
promise.complete(BlobStoreBackedReadHandleImpl.open(readExecutor.chooseThread(ledgerId),
readBlobstore,
readBucket, key, indexKey,
DataBlockUtils.VERSION_CHECK,
@@ -596,10 +600,10 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
indexKeys.add(indexKey);
});
- scheduler.chooseThread(ledgerId).execute(() -> {
+ readExecutor.chooseThread(ledgerId).execute(() -> {
try {
BlobStore readBlobstore =
getBlobStore(config.getBlobStoreLocation());
-
promise.complete(BlobStoreBackedReadHandleImplV2.open(scheduler.chooseThread(ledgerId),
+
promise.complete(BlobStoreBackedReadHandleImplV2.open(readExecutor.chooseThread(ledgerId),
readBlobstore,
readBucket, keys, indexKeys,
DataBlockUtils.VERSION_CHECK,
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
index 6eca3756df8..843ea2931d4 100644
---
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
@@ -82,7 +82,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest
extends BlobStoreManag
mockedConfig = mock(TieredStorageConfiguration.class,
delegatesTo(getConfiguration(bucket, additionalConfig)));
Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use
the REAL blobStore
BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader
- .create(mockedConfig, new HashMap<String, String>(), scheduler,
+ .create(mockedConfig, new HashMap<String, String>(),
scheduler, scheduler,
this.offloaderStats, entryOffsetsCache);
return offloader;
}
@@ -92,7 +92,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest
extends BlobStoreManag
mockedConfig = mock(TieredStorageConfiguration.class,
delegatesTo(getConfiguration(bucket, additionalConfig)));
Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader
- .create(mockedConfig, new HashMap<String, String>(), scheduler,
+ .create(mockedConfig, new HashMap<String, String>(),
scheduler, scheduler,
this.offloaderStats, entryOffsetsCache);
return offloader;
}
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 1613eb6c653..0d9f3f35a09 100644
---
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -99,7 +99,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerO
mockedConfig = mock(TieredStorageConfiguration.class,
delegatesTo(getConfiguration(bucket)));
Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use
the REAL blobStore
BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader.create(mockedConfig,
- new HashMap<String, String>(), scheduler, this.offloaderStats,
+ new HashMap<String, String>(), scheduler, scheduler,
this.offloaderStats,
entryOffsetsCache);
return offloader;
}
@@ -108,7 +108,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerO
mockedConfig = mock(TieredStorageConfiguration.class,
delegatesTo(getConfiguration(bucket)));
Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader.create(mockedConfig,
- new HashMap<String, String>(), scheduler, this.offloaderStats,
+ new HashMap<String, String>(), scheduler, scheduler,
this.offloaderStats,
entryOffsetsCache);
return offloader;
}