This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new df6962e S3 offloader should throw an error on receiving an empty
ledger (#1855)
df6962e is described below
commit df6962e4ce359a22bcad097ac8cad3b55d975dd2
Author: Ivan Kelly <[email protected]>
AuthorDate: Tue May 29 20:07:51 2018 +0200
S3 offloader should throw an error on receiving an empty ledger (#1855)
ManagedLedger should never send an empty ledger for offload, as its a
waste of resources. This patch adds a defensive check to ensure that
if the S3 offload does get an empty ledger, it doesn't even attempt to
create any resources on the S3 side.
Master issue: #1511
---
.../broker/s3offload/S3ManagedLedgerOffloader.java | 5 +++++
.../s3offload/S3ManagedLedgerOffloaderTest.java | 25 ++++++++++++++++++++++
2 files changed, 30 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index 65ffd37..dcfa9e8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -114,6 +114,11 @@ public class S3ManagedLedgerOffloader implements
LedgerOffloader {
Map<String, String> extraMetadata) {
CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.chooseThread(readHandle.getId()).submit(() -> {
+ if (readHandle.getLength() == 0 || !readHandle.isClosed() ||
readHandle.getLastAddConfirmed() < 0) {
+ promise.completeExceptionally(
+ new IllegalArgumentException("An empty or open ledger
should never be offloaded"));
+ return;
+ }
OffloadIndexBlockBuilder indexBuilder =
OffloadIndexBlockBuilder.create()
.withLedgerMetadata(readHandle.getLedgerMetadata())
.withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
index 9f0d253..1b5ad03 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.s3offload;
import static
org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.dataBlockOffloadKey;
import static
org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.indexBlockOffloadKey;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
@@ -30,6 +31,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@@ -422,5 +424,28 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
Assert.assertTrue(mockS3client.doesObjectExist(BUCKET,
indexBlockOffloadKey(readHandle.getId(), uuid)));
}
}
+
+ @Test
+ public void testOffloadEmpty() throws Exception {
+ CompletableFuture<LedgerEntries> noEntries = new CompletableFuture<>();
+ noEntries.completeExceptionally(new BKException.BKReadException());
+
+ ReadHandle readHandle = Mockito.mock(ReadHandle.class);
+ Mockito.doReturn(-1L).when(readHandle).getLastAddConfirmed();
+ Mockito.doReturn(noEntries).when(readHandle).readAsync(anyLong(),
anyLong());
+ Mockito.doReturn(0L).when(readHandle).getLength();
+ Mockito.doReturn(true).when(readHandle).isClosed();
+ Mockito.doReturn(1234L).when(readHandle).getId();
+
+ UUID uuid = UUID.randomUUID();
+ LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client,
BUCKET, scheduler,
+
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+ try {
+ offloader.offload(readHandle, uuid, new HashMap<>()).get();
+ Assert.fail("Shouldn't have been able to offload");
+ } catch (ExecutionException e) {
+ Assert.assertEquals(e.getCause().getClass(),
IllegalArgumentException.class);
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
[email protected].