This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fae2343 Used OrderedScheduler for ledger offload (#1808)
fae2343 is described below
commit fae23437488d8f44a2f7176509750ddd8d168917
Author: Ivan Kelly <[email protected]>
AuthorDate: Mon May 21 09:55:46 2018 +0200
Used OrderedScheduler for ledger offload (#1808)
We want to ensure that operations for the same ledger do not
overlap. This is particularly important for the ReadHandle as it has
state(the buffer) which could be corrupted by multiple concurrent
calls.
To avoid this overlap, we use an OrderedScheduler, so all operations
on a single ledger will run on the same single thread executor.
Master Issue: #1511
---
.../java/org/apache/pulsar/broker/PulsarService.java | 11 ++++++-----
.../broker/s3offload/S3ManagedLedgerOffloader.java | 19 +++++++++----------
.../s3offload/S3ManagedLedgerOffloaderTest.java | 6 +++---
3 files changed, 18 insertions(+), 18 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 3168982..bfc4ce5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -43,6 +43,7 @@ import java.util.function.Supplier;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
@@ -136,7 +137,7 @@ public class PulsarService implements AutoCloseable {
.build();
private final ScheduledExecutorService loadManagerExecutor;
private ScheduledExecutorService compactorExecutor;
- private ScheduledExecutorService offloaderScheduler;
+ private OrderedScheduler offloaderScheduler;
private LedgerOffloader offloader;
private ScheduledFuture<?> loadReportTask = null;
private ScheduledFuture<?> loadSheddingTask = null;
@@ -723,11 +724,11 @@ public class PulsarService implements AutoCloseable {
return this.compactor;
}
- protected synchronized ScheduledExecutorService
getOffloaderScheduler(ServiceConfiguration conf) {
+ protected synchronized OrderedScheduler
getOffloaderScheduler(ServiceConfiguration conf) {
if (this.offloaderScheduler == null) {
- this.offloaderScheduler = Executors.newScheduledThreadPool(
- conf.getManagedLedgerOffloadMaxThreads(),
- new DefaultThreadFactory("offloader-"));
+ this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
+ .numThreads(conf.getManagedLedgerOffloadMaxThreads())
+ .name("offloader").build();
}
return this.offloaderScheduler;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index 276488d..7c8113a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -38,8 +38,8 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -52,7 +52,7 @@ public class S3ManagedLedgerOffloader implements
LedgerOffloader {
private static final Logger log =
LoggerFactory.getLogger(S3ManagedLedgerOffloader.class);
public static final String DRIVER_NAME = "S3";
- private final ScheduledExecutorService scheduler;
+ private final OrderedScheduler scheduler;
private final AmazonS3 s3client;
private final String bucket;
// max block size for each data block.
@@ -60,7 +60,7 @@ public class S3ManagedLedgerOffloader implements
LedgerOffloader {
private final int readBufferSize;
public static S3ManagedLedgerOffloader create(ServiceConfiguration conf,
- ScheduledExecutorService
scheduler)
+ OrderedScheduler scheduler)
throws PulsarServerException {
String region = conf.getS3ManagedLedgerOffloadRegion();
String bucket = conf.getS3ManagedLedgerOffloadBucket();
@@ -85,7 +85,7 @@ public class S3ManagedLedgerOffloader implements
LedgerOffloader {
return new S3ManagedLedgerOffloader(builder.build(), bucket,
scheduler, maxBlockSize, readBufferSize);
}
- S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket,
ScheduledExecutorService scheduler,
+ S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket,
OrderedScheduler scheduler,
int maxBlockSize, int readBufferSize) {
this.s3client = s3client;
this.bucket = bucket;
@@ -108,7 +108,7 @@ public class S3ManagedLedgerOffloader implements
LedgerOffloader {
UUID uuid,
Map<String, String> extraMetadata) {
CompletableFuture<Void> promise = new CompletableFuture<>();
- scheduler.submit(() -> {
+ scheduler.chooseThread(readHandle.getId()).submit(() -> {
OffloadIndexBlockBuilder indexBuilder =
OffloadIndexBlockBuilder.create()
.withLedgerMetadata(readHandle.getLedgerMetadata());
String dataBlockKey = dataBlockOffloadKey(readHandle.getId(),
uuid);
@@ -199,9 +199,10 @@ public class S3ManagedLedgerOffloader implements
LedgerOffloader {
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
String key = dataBlockOffloadKey(ledgerId, uid);
String indexKey = indexBlockOffloadKey(ledgerId, uid);
- scheduler.submit(() -> {
+ scheduler.chooseThread(ledgerId).submit(() -> {
try {
- promise.complete(S3BackedReadHandleImpl.open(scheduler,
s3client,
+
promise.complete(S3BackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
+ s3client,
bucket, key,
indexKey,
ledgerId,
readBufferSize));
} catch (Throwable t) {
@@ -214,16 +215,14 @@ public class S3ManagedLedgerOffloader implements
LedgerOffloader {
@Override
public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
CompletableFuture<Void> promise = new CompletableFuture<>();
- scheduler.submit(() -> {
+ scheduler.chooseThread(ledgerId).submit(() -> {
try {
-
s3client.deleteObjects(new DeleteObjectsRequest(bucket)
.withKeys(dataBlockOffloadKey(ledgerId, uid),
indexBlockOffloadKey(ledgerId, uid)));
promise.complete(null);
} catch (Throwable t) {
log.error("Failed delete s3 Object ", t);
promise.completeExceptionally(t);
- return;
}
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
index ed44ddc..21ceb97 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
@@ -32,7 +32,6 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
@@ -44,6 +43,7 @@ import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -57,11 +57,11 @@ import org.testng.annotations.Test;
class S3ManagedLedgerOffloaderTest extends S3TestBase {
private static final int DEFAULT_BLOCK_SIZE = 5*1024*1024;
private static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024;
- final ScheduledExecutorService scheduler;
+ final OrderedScheduler scheduler;
final MockBookKeeper bk;
S3ManagedLedgerOffloaderTest() throws Exception {
- scheduler = Executors.newScheduledThreadPool(1, new
DefaultThreadFactory("offloader-"));
+ scheduler =
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
bk = new
MockBookKeeper(MockedPulsarServiceBaseTest.createMockZooKeeper());
}
--
To stop receiving notification emails like this one, please contact
[email protected].