This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new a0ee17124e3 [improve] [broker] Separate offload read and write thread 
pool (#24025)
a0ee17124e3 is described below

commit a0ee17124e3481fa48db40aeb5fd1057008cec89
Author: Hang Chen <[email protected]>
AuthorDate: Tue Feb 25 19:27:34 2025 -0800

    [improve] [broker] Separate offload read and write thread pool (#24025)
    
    (cherry picked from commit b407a219c85fa276ae88481e5f93a8ad63c09068)
---
 conf/broker.conf                                   |  3 ++
 .../bookkeeper/mledger/LedgerOffloaderFactory.java | 45 ++++++++++++++++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 +++
 .../org/apache/pulsar/broker/PulsarService.java    | 14 ++++++-
 .../common/policies/data/OffloadPolicies.java      |  2 +
 .../common/policies/data/OffloadPoliciesImpl.java  |  9 +++++
 .../jcloud/JCloudLedgerOffloaderFactory.java       | 14 ++++++-
 .../impl/BlobStoreManagedLedgerOffloader.java      | 16 +++++---
 ...obStoreManagedLedgerOffloaderStreamingTest.java |  4 +-
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  |  4 +-
 10 files changed, 105 insertions(+), 12 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 2a7befafed3..ec561141ac0 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1817,6 +1817,9 @@ managedLedgerOffloadDriver=
 # Maximum number of thread pool threads for ledger offloading
 managedLedgerOffloadMaxThreads=2
 
+# Maximum number of read thread pool threads for ledger offloading
+managedLedgerOffloadReadThreads=2
+
 # The extraction directory of the nar package.
 # Available for Protocol Handler, Additional Servlets, Entry Filter, 
Offloaders, Broker Interceptor.
 # Default is System.getProperty("java.io.tmpdir").
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
index 9fbf9b73c05..80135925913 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
@@ -72,6 +72,27 @@ public interface LedgerOffloaderFactory<T extends 
LedgerOffloader> extends AutoC
              LedgerOffloaderStats offloaderStats)
         throws IOException;
 
+    /**
+     * Create a ledger offloader with the provided configuration, 
user-metadata,
+     * scheduler, readExecutor and offloaderStats.
+     *
+     * @param offloadPolicies offload policies
+     * @param userMetadata user metadata
+     * @param scheduler scheduler
+     * @param readExecutor read executor
+     * @param offloaderStats offloaderStats
+     * @return the offloader instance
+     * @throws IOException when fail to create an offloader
+     */
+    default T create(OffloadPoliciesImpl offloadPolicies,
+             Map<String, String> userMetadata,
+             OrderedScheduler scheduler,
+             OrderedScheduler readExecutor,
+             LedgerOffloaderStats offloaderStats)
+            throws IOException {
+        return create(offloadPolicies, userMetadata, scheduler, 
offloaderStats);
+    }
+
 
     /**
      * Create a ledger offloader with the provided configuration, 
user-metadata, schema storage and scheduler.
@@ -112,6 +133,30 @@ public interface LedgerOffloaderFactory<T extends 
LedgerOffloader> extends AutoC
         return create(offloadPolicies, userMetadata, scheduler, 
offloaderStats);
     }
 
+
+    /**
+     * Create a ledger offloader with the provided configuration, 
user-metadata, schema storage,
+     * scheduler, readExecutor and offloaderStats.
+     *
+     * @param offloadPolicies offload policies
+     * @param userMetadata user metadata
+     * @param schemaStorage used for schema lookup in offloader
+     * @param scheduler scheduler
+     * @param readExecutor read executor
+     * @param offloaderStats offloaderStats
+     * @return the offloader instance
+     * @throws IOException when fail to create an offloader
+     */
+    default T create(OffloadPoliciesImpl offloadPolicies,
+                     Map<String, String> userMetadata,
+                     SchemaStorage schemaStorage,
+                     OrderedScheduler scheduler,
+                     OrderedScheduler readExecutor,
+                     LedgerOffloaderStats offloaderStats)
+            throws IOException {
+        return create(offloadPolicies, userMetadata, scheduler, readExecutor, 
offloaderStats);
+    }
+
     @Override
     default void close() throws Exception {
         // no-op
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c8d65cb46af..8a28e3e7377 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3418,6 +3418,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private int managedLedgerOffloadMaxThreads = 2;
 
+    @FieldContext(
+            category = CATEGORY_STORAGE_OFFLOADING,
+            doc = "Maximum number of thread pool threads for offloaded ledger 
reading"
+    )
+    private int managedLedgerOffloadReadThreads = 2;
+
     @FieldContext(
         category = CATEGORY_STORAGE_OFFLOADING,
         doc = "The directory where nar Extraction of offloaders happens"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 272b5222580..5fd63502f8e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -232,6 +232,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private final ScheduledExecutorService loadManagerExecutor;
     private ScheduledExecutorService compactorExecutor;
     private OrderedScheduler offloaderScheduler;
+    private OrderedScheduler offloaderReadExecutor;
     private OffloadersCache offloadersCache = new OffloadersCache();
     private LedgerOffloader defaultOffloader;
     private LedgerOffloaderStats offloaderStats;
@@ -643,6 +644,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
             executorServicesShutdown.shutdown(compactorExecutor);
             executorServicesShutdown.shutdown(offloaderScheduler);
+            executorServicesShutdown.shutdown(offloaderReadExecutor);
             executorServicesShutdown.shutdown(executor);
             executorServicesShutdown.shutdown(orderedExecutor);
 
@@ -1608,7 +1610,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                                         
LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME.toLowerCase(),
                                         config.getClusterName()
                                 ),
-                                schemaStorage, 
getOffloaderScheduler(offloadPolicies), this.offloaderStats);
+                                schemaStorage, 
getOffloaderScheduler(offloadPolicies),
+                                getOffloaderReadScheduler(offloadPolicies), 
this.offloaderStats);
                     } catch (IOException ioe) {
                         throw new PulsarServerException(ioe.getMessage(), 
ioe.getCause());
                     }
@@ -1680,6 +1683,15 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         return this.offloaderScheduler;
     }
 
+    protected synchronized OrderedScheduler 
getOffloaderReadScheduler(OffloadPoliciesImpl offloadPolicies) {
+        if (this.offloaderReadExecutor == null) {
+            this.offloaderReadExecutor = OrderedScheduler.newSchedulerBuilder()
+                    
.numThreads(offloadPolicies.getManagedLedgerOffloadReadThreads())
+                    .name("offloader-read").build();
+        }
+        return this.offloaderReadExecutor;
+    }
+
     public PulsarClientImpl createClientImpl(ClientConfigurationData conf) 
throws PulsarClientException {
         return createClientImpl(conf, null);
     }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index 9073de658d4..924153e71ff 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -87,6 +87,8 @@ public interface OffloadPolicies {
 
         Builder managedLedgerOffloadMaxThreads(Integer 
managedLedgerOffloadMaxThreads);
 
+        Builder managedLedgerOffloadReadThreads(Integer 
managedLedgerOffloadReadThreads);
+
         Builder managedLedgerOffloadPrefetchRounds(Integer 
managedLedgerOffloadPrefetchRounds);
 
         Builder managedLedgerOffloadThresholdInBytes(Long 
managedLedgerOffloadThresholdInBytes);
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index 37059d105e1..22e72d7cde8 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -83,6 +83,7 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
     public static final int DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES = 128 * 1024 * 
1024;   // 128MiB
     public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;   
   // 1MiB
     public static final int DEFAULT_OFFLOAD_MAX_THREADS = 2;
+    public static final int DEFAULT_OFFLOAD_READ_THREADS = 2;
     public static final int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
     public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
     public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
@@ -108,6 +109,9 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
     private Integer managedLedgerOffloadMaxThreads = 
DEFAULT_OFFLOAD_MAX_THREADS;
     @Configuration
     @JsonProperty(access = JsonProperty.Access.READ_WRITE)
+    private Integer managedLedgerOffloadReadThreads = 
DEFAULT_OFFLOAD_READ_THREADS;
+    @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private Integer managedLedgerOffloadPrefetchRounds = 
DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS;
     @Configuration
     @JsonProperty(access = JsonProperty.Access.READ_WRITE)
@@ -499,6 +503,11 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
             return this;
         }
 
+        public OffloadPoliciesImplBuilder 
managedLedgerOffloadReadThreads(Integer managedLedgerOffloadReadThreads) {
+            impl.managedLedgerOffloadReadThreads = 
managedLedgerOffloadReadThreads;
+            return this;
+        }
+
         public OffloadPoliciesImplBuilder managedLedgerOffloadPrefetchRounds(
                 Integer managedLedgerOffloadPrefetchRounds) {
             impl.managedLedgerOffloadPrefetchRounds = 
managedLedgerOffloadPrefetchRounds;
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
index 60363cf8406..ef2db046e3c 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
@@ -54,7 +54,19 @@ public class JCloudLedgerOffloaderFactory implements 
LedgerOffloaderFactory<Blob
 
         TieredStorageConfiguration config =
                 
TieredStorageConfiguration.create(offloadPolicies.toProperties());
-        return BlobStoreManagedLedgerOffloader.create(config, userMetadata, 
scheduler, offloaderStats,
+        return BlobStoreManagedLedgerOffloader.create(config, userMetadata, 
scheduler, scheduler, offloaderStats,
+                entryOffsetsCache);
+    }
+
+    @Override
+    public BlobStoreManagedLedgerOffloader create(OffloadPoliciesImpl 
offloadPolicies, Map<String, String> userMetadata,
+                                                  OrderedScheduler scheduler,
+                                                  OrderedScheduler 
readExecutor,
+                                                  LedgerOffloaderStats 
offloaderStats) throws IOException {
+
+        TieredStorageConfiguration config =
+                
TieredStorageConfiguration.create(offloadPolicies.toProperties());
+        return BlobStoreManagedLedgerOffloader.create(config, userMetadata, 
scheduler, readExecutor, offloaderStats,
                 entryOffsetsCache);
     }
 
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 9f16ce04da5..33bbc49ee22 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -97,6 +97,7 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
     private static final String MANAGED_LEDGER_NAME = "ManagedLedgerName";
 
     private final OrderedScheduler scheduler;
+    private final OrderedScheduler readExecutor;
     private final TieredStorageConfiguration config;
     private final OffloadPolicies policies;
     private final Location writeLocation;
@@ -125,18 +126,21 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
     public static BlobStoreManagedLedgerOffloader 
create(TieredStorageConfiguration config,
                                                          Map<String, String> 
userMetadata,
                                                          OrderedScheduler 
scheduler,
+                                                         OrderedScheduler 
readExecutor,
                                                          LedgerOffloaderStats 
offloaderStats,
                                                          OffsetsCache 
entryOffsetsCache)
             throws IOException {
 
-        return new BlobStoreManagedLedgerOffloader(config, scheduler, 
userMetadata, offloaderStats, entryOffsetsCache);
+        return new BlobStoreManagedLedgerOffloader(config, scheduler, 
readExecutor,
+                userMetadata, offloaderStats, entryOffsetsCache);
     }
 
     BlobStoreManagedLedgerOffloader(TieredStorageConfiguration config, 
OrderedScheduler scheduler,
+                                    OrderedScheduler readExecutor,
                                     Map<String, String> userMetadata, 
LedgerOffloaderStats offloaderStats,
                                     OffsetsCache entryOffsetsCache) {
-
         this.scheduler = scheduler;
+        this.readExecutor = readExecutor;
         this.userMetadata = userMetadata;
         this.config = config;
         Properties properties = new Properties();
@@ -561,10 +565,10 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
         CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
         String key = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid);
         String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid);
-        scheduler.chooseThread(ledgerId).execute(() -> {
+        readExecutor.chooseThread(ledgerId).execute(() -> {
             try {
                 BlobStore readBlobstore = 
getBlobStore(config.getBlobStoreLocation());
-                
promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
+                
promise.complete(BlobStoreBackedReadHandleImpl.open(readExecutor.chooseThread(ledgerId),
                         readBlobstore,
                         readBucket, key, indexKey,
                         DataBlockUtils.VERSION_CHECK,
@@ -596,10 +600,10 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
             indexKeys.add(indexKey);
         });
 
-        scheduler.chooseThread(ledgerId).execute(() -> {
+        readExecutor.chooseThread(ledgerId).execute(() -> {
             try {
                 BlobStore readBlobstore = 
getBlobStore(config.getBlobStoreLocation());
-                
promise.complete(BlobStoreBackedReadHandleImplV2.open(scheduler.chooseThread(ledgerId),
+                
promise.complete(BlobStoreBackedReadHandleImplV2.open(readExecutor.chooseThread(ledgerId),
                         readBlobstore,
                         readBucket, keys, indexKeys,
                         DataBlockUtils.VERSION_CHECK,
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
index 6eca3756df8..843ea2931d4 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
@@ -82,7 +82,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest 
extends BlobStoreManag
         mockedConfig = mock(TieredStorageConfiguration.class, 
delegatesTo(getConfiguration(bucket, additionalConfig)));
         Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use 
the REAL blobStore
         BlobStoreManagedLedgerOffloader offloader = 
BlobStoreManagedLedgerOffloader
-                .create(mockedConfig, new HashMap<String, String>(), scheduler,
+                .create(mockedConfig, new HashMap<String, String>(), 
scheduler, scheduler,
                         this.offloaderStats, entryOffsetsCache);
         return offloader;
     }
@@ -92,7 +92,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest 
extends BlobStoreManag
         mockedConfig = mock(TieredStorageConfiguration.class, 
delegatesTo(getConfiguration(bucket, additionalConfig)));
         Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
         BlobStoreManagedLedgerOffloader offloader = 
BlobStoreManagedLedgerOffloader
-                .create(mockedConfig, new HashMap<String, String>(), scheduler,
+                .create(mockedConfig, new HashMap<String, String>(), 
scheduler, scheduler,
                         this.offloaderStats, entryOffsetsCache);
         return offloader;
     }
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 1613eb6c653..0d9f3f35a09 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -99,7 +99,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreManagedLedgerO
         mockedConfig = mock(TieredStorageConfiguration.class, 
delegatesTo(getConfiguration(bucket)));
         Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use 
the REAL blobStore
         BlobStoreManagedLedgerOffloader offloader = 
BlobStoreManagedLedgerOffloader.create(mockedConfig,
-                new HashMap<String, String>(), scheduler, this.offloaderStats,
+                new HashMap<String, String>(), scheduler, scheduler, 
this.offloaderStats,
                 entryOffsetsCache);
         return offloader;
     }
@@ -108,7 +108,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreManagedLedgerO
         mockedConfig = mock(TieredStorageConfiguration.class, 
delegatesTo(getConfiguration(bucket)));
         Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
         BlobStoreManagedLedgerOffloader offloader = 
BlobStoreManagedLedgerOffloader.create(mockedConfig,
-                new HashMap<String, String>(), scheduler, this.offloaderStats,
+                new HashMap<String, String>(), scheduler, scheduler, 
this.offloaderStats,
                 entryOffsetsCache);
         return offloader;
     }

Reply via email to