This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fae2343  Used OrderedScheduler for ledger offload (#1808)
fae2343 is described below

commit fae23437488d8f44a2f7176509750ddd8d168917
Author: Ivan Kelly <[email protected]>
AuthorDate: Mon May 21 09:55:46 2018 +0200

    Used OrderedScheduler for ledger offload (#1808)
    
    We want to ensure that operations for the same ledger do not
    overlap. This is particularly important for the ReadHandle as it has
    state(the buffer) which could be corrupted by multiple concurrent
    calls.
    
    To avoid this overlap, we use an OrderedScheduler, so all operations
    on a single ledger will run on the same single thread executor.
    
    Master Issue: #1511
---
 .../java/org/apache/pulsar/broker/PulsarService.java  | 11 ++++++-----
 .../broker/s3offload/S3ManagedLedgerOffloader.java    | 19 +++++++++----------
 .../s3offload/S3ManagedLedgerOffloaderTest.java       |  6 +++---
 3 files changed, 18 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 3168982..bfc4ce5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -43,6 +43,7 @@ import java.util.function.Supplier;
 
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
@@ -136,7 +137,7 @@ public class PulsarService implements AutoCloseable {
             .build();
     private final ScheduledExecutorService loadManagerExecutor;
     private ScheduledExecutorService compactorExecutor;
-    private ScheduledExecutorService offloaderScheduler;
+    private OrderedScheduler offloaderScheduler;
     private LedgerOffloader offloader;
     private ScheduledFuture<?> loadReportTask = null;
     private ScheduledFuture<?> loadSheddingTask = null;
@@ -723,11 +724,11 @@ public class PulsarService implements AutoCloseable {
         return this.compactor;
     }
 
-    protected synchronized ScheduledExecutorService 
getOffloaderScheduler(ServiceConfiguration conf) {
+    protected synchronized OrderedScheduler 
getOffloaderScheduler(ServiceConfiguration conf) {
         if (this.offloaderScheduler == null) {
-            this.offloaderScheduler = Executors.newScheduledThreadPool(
-                    conf.getManagedLedgerOffloadMaxThreads(),
-                    new DefaultThreadFactory("offloader-"));
+            this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
+                .numThreads(conf.getManagedLedgerOffloadMaxThreads())
+                .name("offloader").build();
         }
         return this.offloaderScheduler;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index 276488d..7c8113a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -38,8 +38,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -52,7 +52,7 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
     private static final Logger log = 
LoggerFactory.getLogger(S3ManagedLedgerOffloader.class);
 
     public static final String DRIVER_NAME = "S3";
-    private final ScheduledExecutorService scheduler;
+    private final OrderedScheduler scheduler;
     private final AmazonS3 s3client;
     private final String bucket;
     // max block size for each data block.
@@ -60,7 +60,7 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
     private final int readBufferSize;
 
     public static S3ManagedLedgerOffloader create(ServiceConfiguration conf,
-                                                  ScheduledExecutorService 
scheduler)
+                                                  OrderedScheduler scheduler)
             throws PulsarServerException {
         String region = conf.getS3ManagedLedgerOffloadRegion();
         String bucket = conf.getS3ManagedLedgerOffloadBucket();
@@ -85,7 +85,7 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
         return new S3ManagedLedgerOffloader(builder.build(), bucket, 
scheduler, maxBlockSize, readBufferSize);
     }
 
-    S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, 
ScheduledExecutorService scheduler,
+    S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, 
OrderedScheduler scheduler,
                              int maxBlockSize, int readBufferSize) {
         this.s3client = s3client;
         this.bucket = bucket;
@@ -108,7 +108,7 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
                                            UUID uuid,
                                            Map<String, String> extraMetadata) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        scheduler.submit(() -> {
+        scheduler.chooseThread(readHandle.getId()).submit(() -> {
             OffloadIndexBlockBuilder indexBuilder = 
OffloadIndexBlockBuilder.create()
                 .withLedgerMetadata(readHandle.getLedgerMetadata());
             String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), 
uuid);
@@ -199,9 +199,10 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
         CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
         String key = dataBlockOffloadKey(ledgerId, uid);
         String indexKey = indexBlockOffloadKey(ledgerId, uid);
-        scheduler.submit(() -> {
+        scheduler.chooseThread(ledgerId).submit(() -> {
                 try {
-                    promise.complete(S3BackedReadHandleImpl.open(scheduler, 
s3client,
+                    
promise.complete(S3BackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
+                                                                 s3client,
                                                                  bucket, key, 
indexKey,
                                                                  ledgerId, 
readBufferSize));
                 } catch (Throwable t) {
@@ -214,16 +215,14 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
     @Override
     public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        scheduler.submit(() -> {
+        scheduler.chooseThread(ledgerId).submit(() -> {
             try {
-
                 s3client.deleteObjects(new DeleteObjectsRequest(bucket)
                     .withKeys(dataBlockOffloadKey(ledgerId, uid), 
indexBlockOffloadKey(ledgerId, uid)));
                 promise.complete(null);
             } catch (Throwable t) {
                 log.error("Failed delete s3 Object ", t);
                 promise.completeExceptionally(t);
-                return;
             }
         });
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
index ed44ddc..21ceb97 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
@@ -32,7 +32,6 @@ import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
@@ -44,6 +43,7 @@ import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -57,11 +57,11 @@ import org.testng.annotations.Test;
 class S3ManagedLedgerOffloaderTest extends S3TestBase {
     private static final int DEFAULT_BLOCK_SIZE = 5*1024*1024;
     private static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024;
-    final ScheduledExecutorService scheduler;
+    final OrderedScheduler scheduler;
     final MockBookKeeper bk;
 
     S3ManagedLedgerOffloaderTest() throws Exception {
-        scheduler = Executors.newScheduledThreadPool(1, new 
DefaultThreadFactory("offloader-"));
+        scheduler = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
         bk = new 
MockBookKeeper(MockedPulsarServiceBaseTest.createMockZooKeeper());
     }
 

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to