sijie closed pull request #1808: Used OrderedScheduler for ledger offload
URL: https://github.com/apache/incubator-pulsar/pull/1808
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5f6dbca934..413127b0af 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -43,6 +43,7 @@
 
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
@@ -136,7 +137,7 @@
             .build();
     private final ScheduledExecutorService loadManagerExecutor;
     private ScheduledExecutorService compactorExecutor;
-    private ScheduledExecutorService offloaderScheduler;
+    private OrderedScheduler offloaderScheduler;
     private LedgerOffloader offloader;
     private ScheduledFuture<?> loadReportTask = null;
     private ScheduledFuture<?> loadSheddingTask = null;
@@ -719,11 +720,11 @@ public synchronized Compactor getCompactor() throws 
PulsarServerException {
         return this.compactor;
     }
 
-    protected synchronized ScheduledExecutorService 
getOffloaderScheduler(ServiceConfiguration conf) {
+    protected synchronized OrderedScheduler 
getOffloaderScheduler(ServiceConfiguration conf) {
         if (this.offloaderScheduler == null) {
-            this.offloaderScheduler = Executors.newScheduledThreadPool(
-                    conf.getManagedLedgerOffloadMaxThreads(),
-                    new DefaultThreadFactory("offloader-"));
+            this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
+                .numThreads(conf.getManagedLedgerOffloadMaxThreads())
+                .name("offloader").build();
         }
         return this.offloaderScheduler;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index 7a73a3bbc8..a041d99845 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -38,8 +38,8 @@
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -52,7 +52,7 @@
     private static final Logger log = 
LoggerFactory.getLogger(S3ManagedLedgerOffloader.class);
 
     public static final String DRIVER_NAME = "S3";
-    private final ScheduledExecutorService scheduler;
+    private final OrderedScheduler scheduler;
     private final AmazonS3 s3client;
     private final String bucket;
     // max block size for each data block.
@@ -60,7 +60,7 @@
     private final int readBufferSize;
 
     public static S3ManagedLedgerOffloader create(ServiceConfiguration conf,
-                                                  ScheduledExecutorService 
scheduler)
+                                                  OrderedScheduler scheduler)
             throws PulsarServerException {
         String region = conf.getS3ManagedLedgerOffloadRegion();
         String bucket = conf.getS3ManagedLedgerOffloadBucket();
@@ -85,7 +85,7 @@ public static S3ManagedLedgerOffloader 
create(ServiceConfiguration conf,
         return new S3ManagedLedgerOffloader(builder.build(), bucket, 
scheduler, maxBlockSize, readBufferSize);
     }
 
-    S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, 
ScheduledExecutorService scheduler,
+    S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, 
OrderedScheduler scheduler,
                              int maxBlockSize, int readBufferSize) {
         this.s3client = s3client;
         this.bucket = bucket;
@@ -108,7 +108,7 @@ static String indexBlockOffloadKey(long ledgerId, UUID 
uuid) {
                                            UUID uuid,
                                            Map<String, String> extraMetadata) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        scheduler.submit(() -> {
+        scheduler.chooseThread(readHandle.getId()).submit(() -> {
             OffloadIndexBlockBuilder indexBuilder = 
OffloadIndexBlockBuilder.create()
                 .withMetadata(readHandle.getLedgerMetadata());
             String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), 
uuid);
@@ -196,9 +196,10 @@ static String indexBlockOffloadKey(long ledgerId, UUID 
uuid) {
         CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
         String key = dataBlockOffloadKey(ledgerId, uid);
         String indexKey = indexBlockOffloadKey(ledgerId, uid);
-        scheduler.submit(() -> {
+        scheduler.chooseThread(ledgerId).submit(() -> {
                 try {
-                    promise.complete(S3BackedReadHandleImpl.open(scheduler, 
s3client,
+                    
promise.complete(S3BackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
+                                                                 s3client,
                                                                  bucket, key, 
indexKey,
                                                                  ledgerId, 
readBufferSize));
                 } catch (Throwable t) {
@@ -211,16 +212,14 @@ static String indexBlockOffloadKey(long ledgerId, UUID 
uuid) {
     @Override
     public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        scheduler.submit(() -> {
+        scheduler.chooseThread(ledgerId).submit(() -> {
             try {
-
                 s3client.deleteObjects(new DeleteObjectsRequest(bucket)
                     .withKeys(dataBlockOffloadKey(ledgerId, uid), 
indexBlockOffloadKey(ledgerId, uid)));
                 promise.complete(null);
             } catch (Throwable t) {
                 log.error("Failed delete s3 Object ", t);
                 promise.completeExceptionally(t);
-                return;
             }
         });
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
index ed44ddc586..21ceb97abc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
@@ -32,7 +32,6 @@
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
@@ -44,6 +43,7 @@
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -57,11 +57,11 @@
 class S3ManagedLedgerOffloaderTest extends S3TestBase {
     private static final int DEFAULT_BLOCK_SIZE = 5*1024*1024;
     private static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024;
-    final ScheduledExecutorService scheduler;
+    final OrderedScheduler scheduler;
     final MockBookKeeper bk;
 
     S3ManagedLedgerOffloaderTest() throws Exception {
-        scheduler = Executors.newScheduledThreadPool(1, new 
DefaultThreadFactory("offloader-"));
+        scheduler = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
         bk = new 
MockBookKeeper(MockedPulsarServiceBaseTest.createMockZooKeeper());
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to