klsince commented on code in PR #10263:
URL: https://github.com/apache/pinot/pull/10263#discussion_r1103375647


##########
pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java:
##########
@@ -551,11 +569,74 @@ public void copyToLocalFile(URI srcUri, File dstFile)
   @Override
   public void copyFromLocalFile(File srcFile, URI dstUri)
       throws Exception {
-    LOGGER.info("Copy {} from local to {}", srcFile.getAbsolutePath(), dstUri);
-    URI base = getBase(dstUri);
-    String prefix = sanitizePath(base.relativize(dstUri).getPath());
-    PutObjectRequest putObjectRequest = generatePutObjectRequest(dstUri, 
prefix);
-    _s3Client.putObject(putObjectRequest, srcFile.toPath());
+    if (_minObjectSizeToUploadInParts > 0 && srcFile.length() > 
_minObjectSizeToUploadInParts) {
+      LOGGER.info("Copy {} from local to {} in parts", 
srcFile.getAbsolutePath(), dstUri);
+      uploadFileInParts(srcFile, dstUri);
+    } else {
+      LOGGER.info("Copy {} from local to {}", srcFile.getAbsolutePath(), 
dstUri);
+      String prefix = 
sanitizePath(getBase(dstUri).relativize(dstUri).getPath());
+      PutObjectRequest putObjectRequest = generatePutObjectRequest(dstUri, 
prefix);
+      _s3Client.putObject(putObjectRequest, srcFile.toPath());
+    }
+  }
+
+  private void uploadFileInParts(File srcFile, URI dstUri)
+      throws Exception {
+    String bucket = dstUri.getHost();
+    String prefix = sanitizePath(getBase(dstUri).relativize(dstUri).getPath());
+    CreateMultipartUploadResponse multipartUpload =
+        
_s3Client.createMultipartUpload(CreateMultipartUploadRequest.builder().bucket(bucket).key(prefix).build());
+    String uploadId = multipartUpload.uploadId();
+    // Upload parts sequentially to overcome the 5GB limit of a single 
PutObject call.
+    // TODO: parts can be uploaded in parallel for higher throughput, given a 
thread pool.
+    try (FileInputStream inputStream = FileUtils.openInputStream(srcFile)) {
+      long totalUploaded = 0;
+      long fileSize = srcFile.length();
+      // The part number must start from 1 and no more than the max part num 
allowed, 10000 by default.
+      // The default configs can upload a single file of 1TB, so the if-branch 
should rarely happen.
+      int partNum = 1;
+      long partSizeToUse = _multiPartUploadPartSize;
+      if (partSizeToUse * _multiPartUploadMaxPartNumAllowed < fileSize) {
+        partSizeToUse = (fileSize + _multiPartUploadPartSize) / 
_multiPartUploadMaxPartNumAllowed;

Review Comment:
   duh... 😂



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to