ivankelly commented on a change in pull request #1746: PIP-17: impl offload() for S3ManagedLedgerOffloader URL: https://github.com/apache/incubator-pulsar/pull/1746#discussion_r187852225
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java ########## @@ -60,28 +80,119 @@ public static S3ManagedLedgerOffloader create(ServiceConfiguration conf, } else { builder.setRegion(region); } - return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler); + return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler, maxBlockSize); } - S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, ScheduledExecutorService scheduler) { + S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, ScheduledExecutorService scheduler, int maxBlockSize) { this.s3client = s3client; this.bucket = bucket; this.scheduler = scheduler; + this.maxBlockSize = maxBlockSize; } + static String dataBlockOffloadKey(ReadHandle readHandle, UUID uuid) { + return String.format("ledger-%d-%s", readHandle.getId(), uuid.toString()); + } + + static String indexBlockOffloadKey(ReadHandle readHandle, UUID uuid) { + return String.format("ledger-%d-%s-index", readHandle.getId(), uuid.toString()); + } + + // upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new Block, @Override - public CompletableFuture<Void> offload(ReadHandle ledger, - UUID uid, + public CompletableFuture<Void> offload(ReadHandle readHandle, + UUID uuid, Map<String, String> extraMetadata) { CompletableFuture<Void> promise = new CompletableFuture<>(); scheduler.submit(() -> { - try { - s3client.putObject(bucket, uid.toString(), uid.toString()); - promise.complete(null); - } catch (Throwable t) { - promise.completeExceptionally(t); + OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create() + .withMetadata(readHandle.getLedgerMetadata()); + String dataBlockKey = dataBlockOffloadKey(readHandle, uuid); + String indexBlockKey = indexBlockOffloadKey(readHandle, uuid); + InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey); + InitiateMultipartUploadResult dataBlockRes = null; + + // init multi part upload for data block. + try { + dataBlockRes = s3client.initiateMultipartUpload(dataBlockReq); + } catch (Throwable t) { + if (dataBlockRes != null) { + s3client.abortMultipartUpload( + new AbortMultipartUploadRequest(bucket, dataBlockKey, dataBlockRes.getUploadId())); + } + promise.completeExceptionally(t); + return; + } + + // start multi part upload for data block. + try { + long startEntry = 0; + int partId = 1; + long entryBytesWritten = 0; + List<PartETag> etags = new LinkedList<>(); + while (startEntry <= readHandle.getLastAddConfirmed()) { + int blockSize = BlockAwareSegmentInputStreamImpl + .calculateBlockSize(maxBlockSize, readHandle, startEntry, entryBytesWritten); + + try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl( + readHandle, startEntry, blockSize)) { + + UploadPartResult uploadRes = s3client.uploadPart( + new UploadPartRequest() + .withBucketName(bucket) + .withKey(dataBlockKey) + .withUploadId(dataBlockRes.getUploadId()) + .withInputStream(blockStream) + .withPartSize(blockSize) + .withPartNumber(partId)); + etags.add(uploadRes.getPartETag()); + indexBuilder.addBlock(startEntry, partId, blockSize); + + if (blockStream.getEndEntryId() != -1) { + startEntry = blockStream.getEndEntryId() + 1; + } else { + // could not read entry from ledger. + break; + } + entryBytesWritten += blockStream.getBlockEntryBytesCount(); + partId++; + } + } + + s3client.completeMultipartUpload(new CompleteMultipartUploadRequest() + .withBucketName(bucket).withKey(dataBlockKey) + .withUploadId(dataBlockRes.getUploadId()) + .withPartETags(etags)); + } catch (Throwable t) { + s3client.abortMultipartUpload( + new AbortMultipartUploadRequest(bucket, dataBlockKey, dataBlockRes.getUploadId())); + promise.completeExceptionally(t); + return; + } + + // upload index block + try (OffloadIndexBlock index = indexBuilder.build(); + InputStream indexStream = index.toStream()) { + // write the index block + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(indexStream.available()); + s3client.putObject(new PutObjectRequest( + bucket, + indexBlockOffloadKey(readHandle, uuid), + indexStream, + metadata)); + promise.complete(null); + } catch (Throwable t) { + if (s3client.doesObjectExist(bucket, dataBlockOffloadKey(readHandle, uuid))) { Review comment: there's no need to check. in S3 deleting an object that doesn't exist will return success. https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html Use deleteObjects() so that only one call is made. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services