This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f0b1471 ReadHandle implementation backed by S3 (#1790) f0b1471 is described below commit f0b1471a438ea0b4f77c1c00e0b4c64eb57074f7 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Wed May 16 19:19:11 2018 +0200 ReadHandle implementation backed by S3 (#1790) Implementation of bookkeeper ReadHandle, which reads from an S3 object. Master Issue: #1511 --- conf/broker.conf | 3 + .../apache/pulsar/broker/ServiceConfiguration.java | 12 + .../pulsar/broker/s3offload/OffloadIndexEntry.java | 7 +- .../broker/s3offload/S3ManagedLedgerOffloader.java | 37 ++- .../s3offload/impl/OffloadIndexEntryImpl.java | 4 + .../s3offload/impl/S3BackedReadHandleImpl.java | 207 +++++++++++++++ .../s3offload/S3ManagedLedgerOffloaderTest.java | 278 ++++++++++++--------- 7 files changed, 421 insertions(+), 127 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index cf9faa8..39523cd 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -495,6 +495,9 @@ s3ManagedLedgerOffloadServiceEndpoint= # For Amazon S3 ledger offload, Max block size in bytes. s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864 +# For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default) +s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576 + ### --- Deprecated config variables --- ### # Deprecated. Use configurationStoreServers diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 74363cc..d28ff4c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -487,6 +487,10 @@ public class ServiceConfiguration implements PulsarConfiguration { // For Amazon S3 ledger offload, Max block size in bytes. private int s3ManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024; + // For Amazon S3 ledger offload, Read buffer size in bytes. + @FieldContext(minValue = 1024) + private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB + public String getZookeeperServers() { return zookeeperServers; } @@ -1694,4 +1698,12 @@ public class ServiceConfiguration implements PulsarConfiguration { return this.s3ManagedLedgerOffloadMaxBlockSizeInBytes; } + public void setS3ManagedLedgerOffloadReadBufferSizeInBytes(int readBufferSizeInBytes) { + this.s3ManagedLedgerOffloadReadBufferSizeInBytes = readBufferSizeInBytes; + } + + public int getS3ManagedLedgerOffloadReadBufferSizeInBytes() { + return this.s3ManagedLedgerOffloadReadBufferSizeInBytes; + } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java index 03927d3..6a976ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java @@ -43,8 +43,13 @@ public interface OffloadIndexEntry { int getPartId(); /** - * Get the offset of this message entry in code storage. + * Get the offset of this block within the object. */ long getOffset(); + + /** + * Get the offset of the block's data within the object. + */ + long getDataOffset(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java index 76920de..4d5b388 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java @@ -45,6 +45,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl; +import org.apache.pulsar.broker.s3offload.impl.S3BackedReadHandleImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +58,7 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { private final String bucket; // max block size for each data block. private int maxBlockSize; + private final int readBufferSize; public static S3ManagedLedgerOffloader create(ServiceConfiguration conf, ScheduledExecutorService scheduler) @@ -65,6 +67,7 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { String bucket = conf.getS3ManagedLedgerOffloadBucket(); String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint(); int maxBlockSize = conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes(); + int readBufferSize = conf.getS3ManagedLedgerOffloadReadBufferSizeInBytes(); if (Strings.isNullOrEmpty(region)) { throw new PulsarServerException("s3ManagedLedgerOffloadRegion cannot be empty is s3 offload enabled"); @@ -80,22 +83,24 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { } else { builder.setRegion(region); } - return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler, maxBlockSize); + return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler, maxBlockSize, readBufferSize); } - S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, ScheduledExecutorService scheduler, int maxBlockSize) { + S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, ScheduledExecutorService scheduler, + int maxBlockSize, int readBufferSize) { this.s3client = s3client; this.bucket = bucket; this.scheduler = scheduler; this.maxBlockSize = maxBlockSize; + this.readBufferSize = readBufferSize; } - static String dataBlockOffloadKey(ReadHandle readHandle, UUID uuid) { - return String.format("ledger-%d-%s", readHandle.getId(), uuid.toString()); + static String dataBlockOffloadKey(long ledgerId, UUID uuid) { + return String.format("ledger-%d-%s", ledgerId, uuid.toString()); } - static String indexBlockOffloadKey(ReadHandle readHandle, UUID uuid) { - return String.format("ledger-%d-%s-index", readHandle.getId(), uuid.toString()); + static String indexBlockOffloadKey(long ledgerId, UUID uuid) { + return String.format("ledger-%d-%s-index", ledgerId, uuid.toString()); } // upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new Block, @@ -107,8 +112,8 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { scheduler.submit(() -> { OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create() .withMetadata(readHandle.getLedgerMetadata()); - String dataBlockKey = dataBlockOffloadKey(readHandle, uuid); - String indexBlockKey = indexBlockOffloadKey(readHandle, uuid); + String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), uuid); + String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), uuid); InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey); InitiateMultipartUploadResult dataBlockRes = null; @@ -174,12 +179,12 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { metadata.setContentLength(indexStream.available()); s3client.putObject(new PutObjectRequest( bucket, - indexBlockOffloadKey(readHandle, uuid), + indexBlockKey, indexStream, metadata)); promise.complete(null); } catch (Throwable t) { - s3client.deleteObject(bucket, dataBlockOffloadKey(readHandle, uuid)); + s3client.deleteObject(bucket, dataBlockKey); promise.completeExceptionally(t); return; } @@ -190,7 +195,17 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { @Override public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid) { CompletableFuture<ReadHandle> promise = new CompletableFuture<>(); - promise.completeExceptionally(new UnsupportedOperationException()); + String key = dataBlockOffloadKey(ledgerId, uid); + String indexKey = indexBlockOffloadKey(ledgerId, uid); + scheduler.submit(() -> { + try { + promise.complete(S3BackedReadHandleImpl.open(scheduler, s3client, + bucket, key, indexKey, + ledgerId, readBufferSize)); + } catch (Throwable t) { + promise.completeExceptionally(t); + } + }); return promise; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java index d74ba93..d8d2267 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java @@ -48,6 +48,10 @@ public class OffloadIndexEntryImpl implements OffloadIndexEntry { public long getOffset() { return offset; } + @Override + public long getDataOffset() { + return offset + DataBlockHeaderImpl.getDataStartOffset(); + } public OffloadIndexEntryImpl(long entryId, int partId, long offset) { this.entryId = entryId; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java new file mode 100644 index 0000000..037ea67 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.s3offload.impl; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; + +import org.apache.pulsar.broker.s3offload.OffloadIndexBlock; +import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder; +import org.apache.pulsar.broker.s3offload.OffloadIndexEntry; +import org.apache.pulsar.broker.s3offload.S3BackedInputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class S3BackedReadHandleImpl implements ReadHandle { + private static final Logger log = LoggerFactory.getLogger(S3BackedReadHandleImpl.class); + + private final long ledgerId; + private final OffloadIndexBlock index; + private final S3BackedInputStream inputStream; + private final DataInputStream dataStream; + private final ExecutorService executor; + + private S3BackedReadHandleImpl(long ledgerId, OffloadIndexBlock index, + S3BackedInputStream inputStream, + ExecutorService executor) { + this.ledgerId = ledgerId; + this.index = index; + this.inputStream = inputStream; + this.dataStream = new DataInputStream(inputStream); + this.executor = executor; + } + + @Override + public long getId() { + return ledgerId; + } + + @Override + public LedgerMetadata getLedgerMetadata() { + return index.getLedgerMetadata(); + } + + @Override + public CompletableFuture<Void> closeAsync() { + CompletableFuture<Void> promise = new CompletableFuture<>(); + executor.submit(() -> { + try { + index.close(); + inputStream.close(); + promise.complete(null); + } catch (IOException t) { + promise.completeExceptionally(t); + } + }); + return promise; + } + + @Override + public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) { + log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry); + CompletableFuture<LedgerEntries> promise = new CompletableFuture<>(); + executor.submit(() -> { + if (firstEntry > lastEntry + || firstEntry < 0 + || lastEntry > getLastAddConfirmed()) { + promise.completeExceptionally(new BKException.BKIncorrectParameterException()); + return; + } + long entriesToRead = (lastEntry - firstEntry) + 1; + List<LedgerEntry> entries = new ArrayList<LedgerEntry>(); + long nextExpectedId = firstEntry; + try { + OffloadIndexEntry entry = index.getIndexEntryForEntry(firstEntry); + inputStream.seek(entry.getDataOffset()); + + while (entriesToRead > 0) { + int length = dataStream.readInt(); + if (length < 0) { // hit padding or new block + inputStream.seekForward(index.getIndexEntryForEntry(nextExpectedId).getDataOffset()); + length = dataStream.readInt(); + } + long entryId = dataStream.readLong(); + + if (entryId == nextExpectedId) { + ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length); + entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf)); + int toWrite = length; + while (toWrite > 0) { + toWrite -= buf.writeBytes(dataStream, toWrite); + } + entriesToRead--; + nextExpectedId++; + } else if (entryId > lastEntry) { + log.info("Expected to read {}, but read {}, which is greater than last entry {}", + nextExpectedId, entryId, lastEntry); + throw new BKException.BKUnexpectedConditionException(); + } else { + inputStream.skip(length); + } + } + + promise.complete(LedgerEntriesImpl.create(entries)); + } catch (Throwable t) { + promise.completeExceptionally(t); + entries.forEach(LedgerEntry::close); + } + }); + return promise; + } + + @Override + public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) { + return readAsync(firstEntry, lastEntry); + } + + @Override + public CompletableFuture<Long> readLastAddConfirmedAsync() { + return CompletableFuture.completedFuture(getLastAddConfirmed()); + } + + @Override + public CompletableFuture<Long> tryReadLastAddConfirmedAsync() { + return CompletableFuture.completedFuture(getLastAddConfirmed()); + } + + @Override + public long getLastAddConfirmed() { + return getLedgerMetadata().getLastEntryId(); + } + + @Override + public long getLength() { + return getLedgerMetadata().getLength(); + } + + @Override + public boolean isClosed() { + return getLedgerMetadata().isClosed(); + } + + @Override + public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, + long timeOutInMillis, + boolean parallel) { + CompletableFuture<LastConfirmedAndEntry> promise = new CompletableFuture<>(); + promise.completeExceptionally(new UnsupportedOperationException()); + return promise; + } + + public static ReadHandle open(ScheduledExecutorService executor, + AmazonS3 s3client, String bucket, String key, String indexKey, + long ledgerId, int readBufferSize) + throws AmazonClientException, IOException { + GetObjectRequest req = new GetObjectRequest(bucket, indexKey); + try (S3Object obj = s3client.getObject(req)) { + OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create(); + OffloadIndexBlock index = indexBuilder.fromStream(obj.getObjectContent()); + + ObjectMetadata dataMetadata = s3client.getObjectMetadata(bucket, key); // FIXME: this should be part of index + S3BackedInputStream inputStream = new S3BackedInputStreamImpl(s3client, bucket, key, + dataMetadata.getContentLength(), + readBufferSize); + return new S3BackedReadHandleImpl(ledgerId, index, inputStream, executor); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java index 583b0e0..f9a043b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java @@ -24,20 +24,29 @@ import static org.mockito.Matchers.any; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.S3Object; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.DataInputStream; +import java.lang.reflect.Method; import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Random; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import lombok.extern.slf4j.Slf4j; + +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.LedgerMetadata; import org.apache.bookkeeper.client.MockBookKeeper; -import org.apache.bookkeeper.client.MockLedgerHandle; +import org.apache.bookkeeper.client.api.DigestType; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.pulsar.broker.PulsarServerException; @@ -46,12 +55,14 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl; import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl; import org.apache.pulsar.broker.s3offload.impl.OffloadIndexBlockImpl; -import org.apache.pulsar.broker.s3offload.impl.OffloadIndexTest.LedgerMetadataMock; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; +@Slf4j class S3ManagedLedgerOffloaderTest extends S3TestBase { + private static final int DEFAULT_BLOCK_SIZE = 5*1024*1024; + private static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024; final ScheduledExecutorService scheduler; final MockBookKeeper bk; @@ -60,86 +71,53 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { bk = new MockBookKeeper(MockedPulsarServiceBaseTest.createMockZooKeeper()); } - private ReadHandle buildReadHandle(int entryCount) throws Exception { - MockLedgerHandle lh = (MockLedgerHandle)bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32, "foobar".getBytes()); + private ReadHandle buildReadHandle() throws Exception { + return buildReadHandle(DEFAULT_BLOCK_SIZE, 1); + } - for (int index = 0; index < entryCount; index ++) { - lh.addEntry(("foooobarrr").getBytes()); // add entry with 10 bytes data - } + private ReadHandle buildReadHandle(int maxBlockSize, int blockCount) throws Exception { + Assert.assertTrue(maxBlockSize > DataBlockHeaderImpl.getDataStartOffset()); - lh.close(); + LedgerHandle lh = bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32, "foobar".getBytes()); - // mock ledgerMetadata, so the lac in metadata is not -1; - MockLedgerHandle spy = Mockito.spy(lh); - LedgerMetadataMock metadata = new LedgerMetadataMock(1, 1, 1, - DigestType.CRC32C, "foobar".getBytes(), null, false); - metadata.setLastEntryId(entryCount - 1); - Mockito.when(spy.getLedgerMetadata()).thenReturn(metadata); + int i = 0; + int bytesWrittenCurrentBlock = DataBlockHeaderImpl.getDataStartOffset(); + int blocksWritten = 1; + int entries = 0; - return spy; - } + while (blocksWritten < blockCount + || bytesWrittenCurrentBlock < maxBlockSize/2) { + byte[] entry = ("foobar"+i).getBytes(); + int sizeInBlock = entry.length + 12 /* ENTRY_HEADER_SIZE */; - private void verifyS3ObjectRead(S3Object object, S3Object indexObject, ReadHandle readHandle, int indexEntryCount, int entryCount, int maxBlockSize) throws Exception { - DataInputStream dis = new DataInputStream(object.getObjectContent()); - int isLength = dis.available(); - - // read out index block - DataInputStream indexBlockIs = new DataInputStream(indexObject.getObjectContent()); - OffloadIndexBlock indexBlock = OffloadIndexBlockImpl.get(indexBlockIs); - - // 1. verify index block with passed in index entry count - Assert.assertEquals(indexBlock.getEntryCount(), indexEntryCount); - - // 2. verify index block read out each indexEntry. - int entryIdTracker = 0; - int startPartIdTracker = 1; - int startOffsetTracker = 0; - long entryBytesUploaded = 0; - int entryLength = 10; - for (int i = 0; i < indexEntryCount; i ++) { - // 2.1 verify each indexEntry in header block - OffloadIndexEntry indexEntry = indexBlock.getIndexEntryForEntry(entryIdTracker); - - Assert.assertEquals(indexEntry.getPartId(), startPartIdTracker); - Assert.assertEquals(indexEntry.getEntryId(), entryIdTracker); - Assert.assertEquals(indexEntry.getOffset(), startOffsetTracker); - - // read out and verify each data block related to this index entry - // 2.2 verify data block header. - DataBlockHeader headerReadout = DataBlockHeaderImpl.fromStream(dis); - int expectedBlockSize = BlockAwareSegmentInputStreamImpl - .calculateBlockSize(maxBlockSize, readHandle, entryIdTracker, entryBytesUploaded); - Assert.assertEquals(headerReadout.getBlockLength(), expectedBlockSize); - Assert.assertEquals(headerReadout.getFirstEntryId(), entryIdTracker); - - // 2.3 verify data block - int entrySize = 0; - long entryId = 0; - for (int bytesReadout = headerReadout.getBlockLength() - DataBlockHeaderImpl.getDataStartOffset(); - bytesReadout > 0; - bytesReadout -= (4 + 8 + entrySize)) { - entrySize = dis.readInt(); - entryId = dis.readLong(); - byte[] bytes = new byte[(int) entrySize]; - dis.read(bytes); - - Assert.assertEquals(entrySize, entryLength); - Assert.assertEquals(entryId, entryIdTracker ++); - entryBytesUploaded += entrySize; + if (bytesWrittenCurrentBlock + sizeInBlock > maxBlockSize) { + bytesWrittenCurrentBlock = DataBlockHeaderImpl.getDataStartOffset(); + blocksWritten++; + entries = 0; } + entries++; - startPartIdTracker ++; - startOffsetTracker += headerReadout.getBlockLength(); + lh.addEntry(entry); + bytesWrittenCurrentBlock += sizeInBlock; + i++; } - return; + // workaround mock not closing metadata correctly + Method close = LedgerMetadata.class.getDeclaredMethod("close", long.class); + close.setAccessible(true); + close.invoke(lh.getLedgerMetadata(), lh.getLastAddConfirmed()); + + lh.close(); + + return bk.newOpenLedgerOp().withLedgerId(lh.getId()) + .withPassword("foobar".getBytes()).withDigestType(DigestType.CRC32).execute().get(); } @Test public void testHappyCase() throws Exception { - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, 1024); - - offloader.offload(buildReadHandle(1), UUID.randomUUID(), new HashMap<>()).get(); + LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); + offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get(); } @Test @@ -152,7 +130,7 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { LedgerOffloader offloader = S3ManagedLedgerOffloader.create(conf, scheduler); try { - offloader.offload(buildReadHandle(1), UUID.randomUUID(), new HashMap<>()).get(); + offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get(); Assert.fail("Shouldn't be able to add to bucket"); } catch (ExecutionException e) { Assert.assertTrue(e.getMessage().contains("NoSuchBucket")); @@ -188,36 +166,38 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { } @Test - public void testOffload() throws Exception { - int entryLength = 10; - int entryNumberEachBlock = 10; - ServiceConfiguration conf = new ServiceConfiguration(); - conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME); + public void testOffloadAndRead() throws Exception { + ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3); + LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); + UUID uuid = UUID.randomUUID(); + offloader.offload(toWrite, uuid, new HashMap<>()).get(); - conf.setS3ManagedLedgerOffloadBucket(BUCKET); - conf.setS3ManagedLedgerOffloadRegion("eu-west-1"); - conf.setS3ManagedLedgerOffloadServiceEndpoint(s3endpoint); - conf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes( - DataBlockHeaderImpl.getDataStartOffset() + (entryLength + 12) * entryNumberEachBlock); - LedgerOffloader offloader = S3ManagedLedgerOffloader.create(conf, scheduler); + ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid).get(); + Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed()); - // offload 30 entries, which will be placed into 3 data blocks. - int entryCount = 30; - ReadHandle readHandle = buildReadHandle(entryCount); - UUID uuid = UUID.randomUUID(); - offloader.offload(readHandle, uuid, new HashMap<>()).get(); + try (LedgerEntries toWriteEntries = toWrite.read(0, toWrite.getLastAddConfirmed()); + LedgerEntries toTestEntries = toTest.read(0, toTest.getLastAddConfirmed())) { + Iterator<LedgerEntry> toWriteIter = toWriteEntries.iterator(); + Iterator<LedgerEntry> toTestIter = toTestEntries.iterator(); - S3Object obj = s3client.getObject(BUCKET, dataBlockOffloadKey(readHandle, uuid)); - S3Object indexObj = s3client.getObject(BUCKET, S3ManagedLedgerOffloader.indexBlockOffloadKey(readHandle, uuid)); + while (toWriteIter.hasNext() && toTestIter.hasNext()) { + LedgerEntry toWriteEntry = toWriteIter.next(); + LedgerEntry toTestEntry = toTestIter.next(); - verifyS3ObjectRead(obj, indexObj, readHandle, 3, 30, conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes()); + Assert.assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId()); + Assert.assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId()); + Assert.assertEquals(toWriteEntry.getLength(), toTestEntry.getLength()); + Assert.assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer()); + } + Assert.assertFalse(toWriteIter.hasNext()); + Assert.assertFalse(toTestIter.hasNext()); + } } @Test public void testOffloadFailInitDataBlockUpload() throws Exception { - int maxBlockSize = 1024; - int entryCount = 3; - ReadHandle readHandle = buildReadHandle(entryCount); + ReadHandle readHandle = buildReadHandle(); UUID uuid = UUID.randomUUID(); String failureString = "fail InitDataBlockUpload"; @@ -228,23 +208,22 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { .doThrow(new AmazonServiceException(failureString)) .when(mockS3client).initiateMultipartUpload(any()); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, maxBlockSize); + LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); offloader.offload(readHandle, uuid, new HashMap<>()).get(); Assert.fail("Should throw exception when initiateMultipartUpload"); } catch (Exception e) { // excepted Assert.assertTrue(e.getCause() instanceof AmazonServiceException); Assert.assertTrue(e.getCause().getMessage().contains(failureString)); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle, uuid))); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle, uuid))); + Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); } } @Test public void testOffloadFailDataBlockPartUpload() throws Exception { - int maxBlockSize = 1024; - int entryCount = 3; - ReadHandle readHandle = buildReadHandle(entryCount); + ReadHandle readHandle = buildReadHandle(); UUID uuid = UUID.randomUUID(); String failureString = "fail DataBlockPartUpload"; @@ -256,23 +235,22 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { .when(mockS3client).uploadPart(any()); Mockito.doNothing().when(mockS3client).abortMultipartUpload(any()); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, maxBlockSize); + LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); offloader.offload(readHandle, uuid, new HashMap<>()).get(); Assert.fail("Should throw exception for when uploadPart"); } catch (Exception e) { // excepted Assert.assertTrue(e.getCause() instanceof AmazonServiceException); Assert.assertTrue(e.getCause().getMessage().contains(failureString)); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle, uuid))); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle, uuid))); + Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); } } @Test public void testOffloadFailDataBlockUploadComplete() throws Exception { - int maxBlockSize = 1024; - int entryCount = 3; - ReadHandle readHandle = buildReadHandle(entryCount); + ReadHandle readHandle = buildReadHandle(); UUID uuid = UUID.randomUUID(); String failureString = "fail DataBlockUploadComplete"; @@ -284,23 +262,22 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { .when(mockS3client).completeMultipartUpload(any()); Mockito.doNothing().when(mockS3client).abortMultipartUpload(any()); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, maxBlockSize); + LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); offloader.offload(readHandle, uuid, new HashMap<>()).get(); Assert.fail("Should throw exception for when completeMultipartUpload"); } catch (Exception e) { // excepted Assert.assertTrue(e.getCause() instanceof AmazonServiceException); Assert.assertTrue(e.getCause().getMessage().contains(failureString)); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle, uuid))); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle, uuid))); + Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); } } @Test public void testOffloadFailPutIndexBlock() throws Exception { - int maxBlockSize = 1024; - int entryCount = 3; - ReadHandle readHandle = buildReadHandle(entryCount); + ReadHandle readHandle = buildReadHandle(); UUID uuid = UUID.randomUUID(); String failureString = "fail putObject"; @@ -311,15 +288,86 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { .doThrow(new AmazonServiceException(failureString)) .when(mockS3client).putObject(any()); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, maxBlockSize); + LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); offloader.offload(readHandle, uuid, new HashMap<>()).get(); Assert.fail("Should throw exception for when putObject for index block"); } catch (Exception e) { // excepted Assert.assertTrue(e.getCause() instanceof AmazonServiceException); Assert.assertTrue(e.getCause().getMessage().contains(failureString)); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle, uuid))); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle, uuid))); + Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + } + } + + @Test + public void testOffloadReadRandomAccess() throws Exception { + ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3); + long[][] randomAccesses = new long[10][2]; + Random r = new Random(0); + for (int i = 0; i < 10; i++) { + long first = r.nextInt((int)toWrite.getLastAddConfirmed()); + long second = r.nextInt((int)toWrite.getLastAddConfirmed()); + if (second < first) { + long tmp = first; + first = second; + second = tmp; + } + randomAccesses[i][0] = first; + randomAccesses[i][1] = second; + } + + LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); + UUID uuid = UUID.randomUUID(); + offloader.offload(toWrite, uuid, new HashMap<>()).get(); + + ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid).get(); + Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed()); + + for (long[] access : randomAccesses) { + try (LedgerEntries toWriteEntries = toWrite.read(access[0], access[1]); + LedgerEntries toTestEntries = toTest.read(access[0], access[1])) { + Iterator<LedgerEntry> toWriteIter = toWriteEntries.iterator(); + Iterator<LedgerEntry> toTestIter = toTestEntries.iterator(); + + while (toWriteIter.hasNext() && toTestIter.hasNext()) { + LedgerEntry toWriteEntry = toWriteIter.next(); + LedgerEntry toTestEntry = toTestIter.next(); + + Assert.assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId()); + Assert.assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId()); + Assert.assertEquals(toWriteEntry.getLength(), toTestEntry.getLength()); + Assert.assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer()); + } + Assert.assertFalse(toWriteIter.hasNext()); + Assert.assertFalse(toTestIter.hasNext()); + } + } + } + + @Test + public void testOffloadReadInvalidEntryIds() throws Exception { + ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1); + LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); + UUID uuid = UUID.randomUUID(); + offloader.offload(toWrite, uuid, new HashMap<>()).get(); + + ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid).get(); + Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed()); + + try { + toTest.read(-1, -1); + Assert.fail("Shouldn't be able to read anything"); + } catch (BKException.BKIncorrectParameterException e) { + } + + try { + toTest.read(0, toTest.getLastAddConfirmed() + 1); + Assert.fail("Shouldn't be able to read anything"); + } catch (BKException.BKIncorrectParameterException e) { } } } -- To stop receiving notification emails like this one, please contact si...@apache.org.