zhaijack commented on a change in pull request #1746: PIP-17: impl offload() 
for S3ManagedLedgerOffloader
URL: https://github.com/apache/incubator-pulsar/pull/1746#discussion_r187867412
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
 ##########
 @@ -60,28 +80,119 @@ public static S3ManagedLedgerOffloader 
create(ServiceConfiguration conf,
         } else {
             builder.setRegion(region);
         }
-        return new S3ManagedLedgerOffloader(builder.build(), bucket, 
scheduler);
+        return new S3ManagedLedgerOffloader(builder.build(), bucket, 
scheduler, maxBlockSize);
     }
 
-    S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, 
ScheduledExecutorService scheduler) {
+    S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, 
ScheduledExecutorService scheduler, int maxBlockSize) {
         this.s3client = s3client;
         this.bucket = bucket;
         this.scheduler = scheduler;
+        this.maxBlockSize = maxBlockSize;
     }
 
+    static String dataBlockOffloadKey(ReadHandle readHandle, UUID uuid) {
+        return String.format("ledger-%d-%s", readHandle.getId(), 
uuid.toString());
+    }
+
+    static String indexBlockOffloadKey(ReadHandle readHandle, UUID uuid) {
+        return String.format("ledger-%d-%s-index", readHandle.getId(), 
uuid.toString());
+    }
+
+    // upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new 
Block,
     @Override
-    public CompletableFuture<Void> offload(ReadHandle ledger,
-                                           UUID uid,
+    public CompletableFuture<Void> offload(ReadHandle readHandle,
+                                           UUID uuid,
                                            Map<String, String> extraMetadata) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
         scheduler.submit(() -> {
-                try {
-                    s3client.putObject(bucket, uid.toString(), uid.toString());
-                    promise.complete(null);
-                } catch (Throwable t) {
-                    promise.completeExceptionally(t);
+            OffloadIndexBlockBuilder indexBuilder = 
OffloadIndexBlockBuilder.create()
+                .withMetadata(readHandle.getLedgerMetadata());
+            String dataBlockKey = dataBlockOffloadKey(readHandle, uuid);
+            String indexBlockKey = indexBlockOffloadKey(readHandle, uuid);
+            InitiateMultipartUploadRequest dataBlockReq = new 
InitiateMultipartUploadRequest(bucket, dataBlockKey);
+            InitiateMultipartUploadResult dataBlockRes = null;
+
+            // init multi part upload for data block.
+            try {
+                dataBlockRes = s3client.initiateMultipartUpload(dataBlockReq);
+            } catch (Throwable t) {
+                if (dataBlockRes != null) {
 
 Review comment:
   Thanks, this is to avoid findify.s3mock error, where delete or abort an not 
exist target will cause error. will change it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to