This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6caa8a038f upload large s3 object by multi parts to overcome the 5GB 
limit of PutObject (#10263)
6caa8a038f is described below

commit 6caa8a038f328feed381511c382c8593e06138a0
Author: Xiaobing <[email protected]>
AuthorDate: Sat Feb 11 10:04:42 2023 -0800

    upload large s3 object by multi parts to overcome the 5GB limit of 
PutObject (#10263)
---
 .../apache/pinot/plugin/filesystem/S3Config.java   | 27 ++++++-
 .../apache/pinot/plugin/filesystem/S3PinotFS.java  | 90 ++++++++++++++++++++--
 .../pinot/plugin/filesystem/S3PinotFSTest.java     | 27 +++++++
 3 files changed, 136 insertions(+), 8 deletions(-)

diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
index b9b6c1ca35..9d2fd6d3c2 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
@@ -22,14 +22,18 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import java.util.UUID;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.DataSizeUtils;
 
 
 /**
  * S3 related config
  */
 public class S3Config {
-
   private static final boolean DEFAULT_DISABLE_ACL = true;
+  // From https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html, 
the part number must be an integer
+  // between 1 and 10000, inclusive; and the min part size allowed is 5MiB, 
except the last one.
+  private static final long MULTI_PART_UPLOAD_MIN_PART_SIZE = 5 * 1024 * 1024;
+  public static final int MULTI_PART_UPLOAD_MAX_PART_NUM = 10000;
 
   public static final String ACCESS_KEY = "accessKey";
   public static final String SECRET_KEY = "secretKey";
@@ -49,6 +53,9 @@ public class S3Config {
   public static final String EXTERNAL_ID = "externalId";
   public static final String SESSION_DURATION_SECONDS = 
"sessionDurationSeconds";
   public static final String ASYNC_SESSION_UPDATED_ENABLED = 
"asyncSessionUpdateEnabled";
+  public static final String MIN_OBJECT_SIZE_FOR_MULTI_PART_UPLOAD = 
"minObjectSizeForMultiPartUpload";
+  public static final String MULTI_PART_UPLOAD_PART_SIZE = 
"multiPartUploadPartSize";
+  private static final String DEFAULT_MULTI_PART_UPLOAD_PART_SIZE = "128MB";
   public static final String DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED = "false";
   public static final String DEFAULT_SESSION_DURATION_SECONDS = "900";
   public static final String DEFAULT_ASYNC_SESSION_UPDATED_ENABLED = "true";
@@ -69,6 +76,8 @@ public class S3Config {
   private String _externalId;
   private int _sessionDurationSeconds;
   private boolean _asyncSessionUpdateEnabled;
+  private final long _minObjectSizeForMultiPartUpload;
+  private final long _multiPartUploadPartSize;
 
   public S3Config(PinotConfiguration pinotConfig) {
     _disableAcl = pinotConfig.getProperty(DISABLE_ACL_CONFIG_KEY, 
DEFAULT_DISABLE_ACL);
@@ -91,7 +100,13 @@ public class S3Config {
         Integer.parseInt(pinotConfig.getProperty(SESSION_DURATION_SECONDS, 
DEFAULT_SESSION_DURATION_SECONDS));
     _asyncSessionUpdateEnabled = Boolean.parseBoolean(
         pinotConfig.getProperty(ASYNC_SESSION_UPDATED_ENABLED, 
DEFAULT_ASYNC_SESSION_UPDATED_ENABLED));
-
+    // non-positive values to disable multipart upload.
+    _minObjectSizeForMultiPartUpload =
+        
DataSizeUtils.toBytes(pinotConfig.getProperty(MIN_OBJECT_SIZE_FOR_MULTI_PART_UPLOAD,
 "-1"));
+    _multiPartUploadPartSize = DataSizeUtils.toBytes(
+        pinotConfig.getProperty(MULTI_PART_UPLOAD_PART_SIZE, 
DEFAULT_MULTI_PART_UPLOAD_PART_SIZE));
+    Preconditions.checkArgument(_multiPartUploadPartSize > 
MULTI_PART_UPLOAD_MIN_PART_SIZE,
+        "The part size for multipart upload must be larger than 5MB");
     if (_iamRoleBasedAccess) {
       Preconditions.checkNotNull(_roleArn, "Must provide 'roleArn' if 
iamRoleBasedAccess is enabled");
     }
@@ -152,4 +167,12 @@ public class S3Config {
   public boolean isAsyncSessionUpdateEnabled() {
     return _asyncSessionUpdateEnabled;
   }
+
+  public long getMinObjectSizeForMultiPartUpload() {
+    return _minObjectSizeForMultiPartUpload;
+  }
+
+  public long getMultiPartUploadPartSize() {
+    return _multiPartUploadPartSize;
+  }
 }
diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index ca2a938ef0..db976415f3 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -18,10 +18,12 @@
  */
 package org.apache.pinot.plugin.filesystem;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
@@ -31,6 +33,7 @@ import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -52,8 +55,14 @@ import software.amazon.awssdk.core.sync.ResponseTransformer;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
 import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
 import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
 import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
 import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
@@ -69,6 +78,8 @@ import 
software.amazon.awssdk.services.s3.model.PutObjectResponse;
 import software.amazon.awssdk.services.s3.model.S3Exception;
 import software.amazon.awssdk.services.s3.model.S3Object;
 import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
 import software.amazon.awssdk.services.sts.StsClient;
 import 
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
 import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
@@ -87,6 +98,8 @@ public class S3PinotFS extends BasePinotFS {
   private ServerSideEncryption _serverSideEncryption = null;
   private String _ssekmsKeyId;
   private String _ssekmsEncryptionContext;
+  private long _minObjectSizeToUploadInParts;
+  private long _multiPartUploadPartSize;
 
   @Override
   public void init(PinotConfiguration config) {
@@ -135,6 +148,7 @@ public class S3PinotFS extends BasePinotFS {
         }
       }
       _s3Client = s3ClientBuilder.build();
+      setMultiPartUploadConfigs(s3Config);
     } catch (S3Exception e) {
       throw new RuntimeException("Could not initialize S3PinotFS", e);
     }
@@ -147,6 +161,7 @@ public class S3PinotFS extends BasePinotFS {
    */
   public void init(S3Client s3Client) {
     _s3Client = s3Client;
+    setMultiPartUploadConfigs(-1, -1);
   }
 
   /**
@@ -157,7 +172,9 @@ public class S3PinotFS extends BasePinotFS {
    */
   public void init(S3Client s3Client, String serverSideEncryption, 
PinotConfiguration serverSideEncryptionConfig) {
     _s3Client = s3Client;
-    setServerSideEncryption(serverSideEncryption, new 
S3Config(serverSideEncryptionConfig));
+    S3Config s3Config = new S3Config(serverSideEncryptionConfig);
+    setServerSideEncryption(serverSideEncryption, s3Config);
+    setMultiPartUploadConfigs(s3Config);
   }
 
   private void setServerSideEncryption(@Nullable String serverSideEncryption, 
S3Config s3Config) {
@@ -551,11 +568,72 @@ public class S3PinotFS extends BasePinotFS {
   @Override
   public void copyFromLocalFile(File srcFile, URI dstUri)
       throws Exception {
-    LOGGER.info("Copy {} from local to {}", srcFile.getAbsolutePath(), dstUri);
-    URI base = getBase(dstUri);
-    String prefix = sanitizePath(base.relativize(dstUri).getPath());
-    PutObjectRequest putObjectRequest = generatePutObjectRequest(dstUri, 
prefix);
-    _s3Client.putObject(putObjectRequest, srcFile.toPath());
+    if (_minObjectSizeToUploadInParts > 0 && srcFile.length() > 
_minObjectSizeToUploadInParts) {
+      LOGGER.info("Copy {} from local to {} in parts", 
srcFile.getAbsolutePath(), dstUri);
+      uploadFileInParts(srcFile, dstUri);
+    } else {
+      LOGGER.info("Copy {} from local to {}", srcFile.getAbsolutePath(), 
dstUri);
+      String prefix = 
sanitizePath(getBase(dstUri).relativize(dstUri).getPath());
+      PutObjectRequest putObjectRequest = generatePutObjectRequest(dstUri, 
prefix);
+      _s3Client.putObject(putObjectRequest, srcFile.toPath());
+    }
+  }
+
+  private void uploadFileInParts(File srcFile, URI dstUri)
+      throws Exception {
+    String bucket = dstUri.getHost();
+    String prefix = sanitizePath(getBase(dstUri).relativize(dstUri).getPath());
+    CreateMultipartUploadResponse multipartUpload =
+        
_s3Client.createMultipartUpload(CreateMultipartUploadRequest.builder().bucket(bucket).key(prefix).build());
+    String uploadId = multipartUpload.uploadId();
+    // Upload parts sequentially to overcome the 5GB limit of a single 
PutObject call.
+    // TODO: parts can be uploaded in parallel for higher throughput, given a 
thread pool.
+    try (FileInputStream inputStream = FileUtils.openInputStream(srcFile)) {
+      long totalUploaded = 0;
+      long fileSize = srcFile.length();
+      // The part number must start from 1 and no more than the max part num 
allowed, 10000 by default.
+      // The default configs can upload a single file of 1TB, so the if-branch 
should rarely happen.
+      int partNum = 1;
+      long partSizeToUse = _multiPartUploadPartSize;
+      if (partSizeToUse * S3Config.MULTI_PART_UPLOAD_MAX_PART_NUM < fileSize) {
+        partSizeToUse =
+            (fileSize + S3Config.MULTI_PART_UPLOAD_MAX_PART_NUM - 1) / 
S3Config.MULTI_PART_UPLOAD_MAX_PART_NUM;
+        LOGGER.info("Increased part size from {} to {} for large file size {} 
due to max allowed uploads {}",
+            _multiPartUploadPartSize, partSizeToUse, fileSize, 
S3Config.MULTI_PART_UPLOAD_MAX_PART_NUM);
+      }
+      List<CompletedPart> parts = new ArrayList<>();
+      while (totalUploaded < srcFile.length()) {
+        long nextPartSize = Math.min(partSizeToUse, fileSize - totalUploaded);
+        UploadPartResponse uploadPartResponse = _s3Client.uploadPart(
+            
UploadPartRequest.builder().bucket(bucket).key(prefix).uploadId(uploadId).partNumber(partNum).build(),
+            RequestBody.fromInputStream(inputStream, nextPartSize));
+        
parts.add(CompletedPart.builder().partNumber(partNum).eTag(uploadPartResponse.eTag()).build());
+        totalUploaded += nextPartSize;
+        LOGGER.debug("Uploaded part {} of size {}, with total uploaded {} and 
file size {}", partNum, nextPartSize,
+            totalUploaded, fileSize);
+        // set counters to upload the next part.
+        partNum++;
+      }
+      // complete the multipart upload
+      _s3Client.completeMultipartUpload(
+          
CompleteMultipartUploadRequest.builder().uploadId(uploadId).bucket(bucket).key(prefix)
+              
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build()).build());
+    } catch (Exception e) {
+      LOGGER.error("Failed to upload file {} to {} in parts. Abort upload 
request: {}", srcFile, dstUri, uploadId, e);
+      _s3Client.abortMultipartUpload(
+          
AbortMultipartUploadRequest.builder().uploadId(uploadId).bucket(bucket).key(prefix).build());
+      throw e;
+    }
+  }
+
+  private void setMultiPartUploadConfigs(S3Config s3Config) {
+    setMultiPartUploadConfigs(s3Config.getMinObjectSizeForMultiPartUpload(), 
s3Config.getMultiPartUploadPartSize());
+  }
+
+  @VisibleForTesting
+  void setMultiPartUploadConfigs(long minObjectSizeToUploadInParts, long 
multiPartUploadPartSize) {
+    _minObjectSizeToUploadInParts = minObjectSizeToUploadInParts;
+    _multiPartUploadPartSize = multiPartUploadPartSize;
   }
 
   @Override
diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
index b877c29e91..7c6cc6dbf7 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
@@ -349,6 +349,33 @@ public class S3PinotFSTest {
     fileToDownload.deleteOnExit();
   }
 
+  @Test
+  public void testMultiPartUpload()
+      throws Exception {
+    String fileName = "copyFile.txt";
+
+    File fileToCopy = new 
File(getClass().getClassLoader().getResource(fileName).getFile());
+
+    // input file size is 20
+    _s3PinotFS.setMultiPartUploadConfigs(1, 3);
+    try {
+      _s3PinotFS.copyFromLocalFile(fileToCopy, 
URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)));
+    } finally {
+      // disable multipart upload again for the other UT cases.
+      _s3PinotFS.setMultiPartUploadConfigs(-1, 128 * 1024 * 1024);
+    }
+
+    HeadObjectResponse headObjectResponse = 
_s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, fileName));
+
+    Assert.assertEquals(headObjectResponse.contentLength(), (Long) 
fileToCopy.length());
+
+    File fileToDownload = new 
File("copyFile_download_multipart.txt").getAbsoluteFile();
+    _s3PinotFS.copyToLocalFile(URI.create(String.format(FILE_FORMAT, SCHEME, 
BUCKET, fileName)), fileToDownload);
+    Assert.assertEquals(fileToCopy.length(), fileToDownload.length());
+
+    fileToDownload.deleteOnExit();
+  }
+
   @Test
   public void testOpenFile()
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to