This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f6effb30f [ASTERIXDB-3442][STO] Use PUT for single part upload
6f6effb30f is described below

commit 6f6effb30fb06d314cf26ff08485bc9687e6e010
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Thu Jun 20 14:32:48 2024 -0700

    [ASTERIXDB-3442][STO] Use PUT for single part upload
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    Use PUT instead of multipart upload when uploading
    single-part files.
    
    Change-Id: I30a8e035b3ee9a851ff5fb89b372fc836cdf1ca9
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18394
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Hussain Towaileb <[email protected]>
---
 .../asterix/cloud/CloudResettableInputStream.java  | 11 ++++++---
 .../cloud/clients/ICloudBufferedWriter.java        | 12 ++++++++--
 .../cloud/clients/aws/s3/S3BufferedWriter.java     | 27 ++++++++++++++++++----
 3 files changed, 41 insertions(+), 9 deletions(-)

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index 885d612727..9e3502091d 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -31,9 +31,8 @@ import org.apache.logging.log4j.Logger;
 public class CloudResettableInputStream extends InputStream implements 
ICloudWriter {
     private static final Logger LOGGER = LogManager.getLogger();
     private final IWriteBufferProvider bufferProvider;
-    private ByteBuffer writeBuffer;
-
     private final ICloudBufferedWriter bufferedWriter;
+    private ByteBuffer writeBuffer;
 
     public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter, 
IWriteBufferProvider bufferProvider) {
         this.bufferedWriter = bufferedWriter;
@@ -140,7 +139,13 @@ public class CloudResettableInputStream extends 
InputStream implements ICloudWri
                  * OR
                  * (2) nothing was written to the file at all to ensure 
writing empty file
                  */
-                uploadAndWait();
+                writeBuffer.flip();
+                try {
+                    bufferedWriter.uploadLast(this, writeBuffer);
+                } catch (Exception e) {
+                    LOGGER.error(e);
+                    throw HyracksDataException.create(e);
+                }
             }
             bufferedWriter.finish();
         } finally {
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java
index 6b51964479..35047cede9 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.cloud.clients;
 
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -29,9 +30,16 @@ public interface ICloudBufferedWriter {
      *
      * @param stream stream
      * @param length length
-     * @return amount uploaded
      */
-    int upload(InputStream stream, int length) throws HyracksDataException;
+    void upload(InputStream stream, int length) throws HyracksDataException;
+
+    /**
+     * Upload the last content of the stream or buffer depending on whether a 
previous part was uploaded
+     *
+     * @param stream stream
+     * @param buffer buffer (should be used instead of stream if no previous 
bytes were written)
+     */
+    void uploadLast(InputStream stream, ByteBuffer buffer) throws 
HyracksDataException;
 
     /**
      * Checks whether the writer has not written anything
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
index 93be80c766..d0dda2a02c 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.cloud.clients.aws.s3;
 
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -38,9 +39,11 @@ import 
software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
 import software.amazon.awssdk.services.s3.model.CompletedPart;
 import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
 import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.UploadPartRequest;
 
 public class S3BufferedWriter implements ICloudBufferedWriter {
+    private static final String PUT_UPLOAD_ID = "putUploadId";
     private static final int MAX_RETRIES = 3;
 
     private static final Logger LOGGER = LogManager.getLogger();
@@ -65,7 +68,7 @@ public class S3BufferedWriter implements ICloudBufferedWriter 
{
     }
 
     @Override
-    public int upload(InputStream stream, int length) {
+    public void upload(InputStream stream, int length) {
         guardian.checkIsolatedWriteAccess(bucket, path);
         profiler.objectMultipartUpload();
         setUploadId();
@@ -73,8 +76,21 @@ public class S3BufferedWriter implements 
ICloudBufferedWriter {
                 
UploadPartRequest.builder().uploadId(uploadId).partNumber(partNumber).bucket(bucket).key(path).build();
         String etag = s3Client.uploadPart(upReq, 
RequestBody.fromInputStream(stream, length)).eTag();
         
partQueue.add(CompletedPart.builder().partNumber(partNumber).eTag(etag).build());
+        partNumber++;
+    }
 
-        return partNumber++;
+    @Override
+    public void uploadLast(InputStream stream, ByteBuffer buffer) {
+        if (uploadId == null) {
+            profiler.objectWrite();
+            PutObjectRequest request = 
PutObjectRequest.builder().bucket(bucket).key(path).build();
+            // TODO make retryable
+            s3Client.putObject(request, RequestBody.fromByteBuffer(buffer));
+            // Only set the uploadId if the putObject succeeds
+            uploadId = PUT_UPLOAD_ID;
+        } else {
+            upload(stream, buffer.limit());
+        }
     }
 
     @Override
@@ -86,9 +102,12 @@ public class S3BufferedWriter implements 
ICloudBufferedWriter {
     public void finish() throws HyracksDataException {
         if (uploadId == null) {
             throw new IllegalStateException("Cannot finish without writing any 
bytes");
+        } else if (PUT_UPLOAD_ID.equals(uploadId)) {
+            LOGGER.debug("FINISHED multipart upload as PUT for {}", path);
+            return;
         }
 
-        // A non-empty files, proceed with completing the multipart upload
+        // Finishing a multipart file. Proceed with completing the multipart 
upload
         CompletedMultipartUpload completedMultipartUpload = 
CompletedMultipartUpload.builder().parts(partQueue).build();
         CompleteMultipartUploadRequest completeMultipartUploadRequest = 
CompleteMultipartUploadRequest.builder()
                 
.bucket(bucket).key(path).uploadId(uploadId).multipartUpload(completedMultipartUpload).build();
@@ -119,7 +138,7 @@ public class S3BufferedWriter implements 
ICloudBufferedWriter {
 
     @Override
     public void abort() throws HyracksDataException {
-        if (uploadId == null) {
+        if (uploadId == null || PUT_UPLOAD_ID.equals(uploadId)) {
             return;
         }
         s3Client.abortMultipartUpload(

Reply via email to