gavinchou commented on code in PR #56487:
URL: https://github.com/apache/doris/pull/56487#discussion_r2384468465


##########
fe/fe-common/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -3630,4 +3630,10 @@ public static int metaServiceRpcRetryTimes() {
     public static long cloud_auto_snapshot_max_reversed_num = 35;
     @ConfField(mutable = true)
     public static long cloud_auto_snapshot_min_interval_seconds = 3600;
+    @ConfField(mutable = true)
+    public static long multi_part_upload_part_size_in_bytes = 512 * 1024 * 
1024L; // 512MB

Review Comment:
   what if we change this configuration when we are doing multipart upload?



##########
fe/fe-core/src/main/java/org/apache/doris/cloud/storage/DefaultRemote.java:
##########
@@ -190,6 +211,97 @@ public void putObject(File file, String key) throws 
DdlException {
         }
     }
 
+    @Override
+    public void multiPartUploadObject(File file, String key) throws 
DdlException {
+        long fileSize = file.length();
+        if (fileSize <= Config.multi_part_upload_part_size_in_bytes) {
+            putObject(file, key);
+            return;
+        }
+
+        long start = System.currentTimeMillis();
+        initClient();
+        initPool();
+        // create multipart upload
+        CreateMultipartUploadRequest createMultipartUploadRequest = 
CreateMultipartUploadRequest.builder()
+                .bucket(obj.getBucket()).key(key).build();
+        CreateMultipartUploadResponse multipartUpload = 
s3Client.createMultipartUpload(
+                createMultipartUploadRequest);
+        String uploadId = multipartUpload.uploadId();
+
+        // calculate part size
+        long partSize = Config.multi_part_upload_part_size_in_bytes;
+        if (partSize * MULTI_PART_UPLOAD_MAX_PART_NUM < fileSize) {
+            partSize = (fileSize + MULTI_PART_UPLOAD_MAX_PART_NUM - 1) / 
MULTI_PART_UPLOAD_MAX_PART_NUM;
+        }
+        int totalPartNum = (int) (fileSize / partSize) + (fileSize % partSize 
== 0 ? 0 : 1);
+        LOG.info("multi part upload file: {}, size: {}, part size: {}, total 
part num: {}",
+                file.getAbsolutePath(), fileSize, partSize, totalPartNum);
+
+        try (FileInputStream inputStream = FileUtils.openInputStream(file)) {
+            List<CompletedPart> parts = new ArrayList<>();
+            CountDownLatch latch = new CountDownLatch(totalPartNum);
+            int partNum = 1;
+            long totalUploaded = 0;
+            AtomicBoolean failed = new AtomicBoolean(false);
+
+            while (totalUploaded < fileSize && !failed.get()) {
+                long nextPartSize = Math.min(partSize, fileSize - 
totalUploaded);
+                int partNumConst = partNum;
+                POOL.submit(() -> {
+                    if (failed.get()) {
+                        return;
+                    }
+                    LOG.debug("start multi part upload for num: {}", 
partNumConst);
+                    UploadPartRequest uploadPartRequest = 
UploadPartRequest.builder().bucket(obj.getBucket()).key(key)
+                            
.uploadId(uploadId).partNumber(partNumConst).build();
+                    try {
+                        UploadPartResponse uploadPartResponse = 
s3Client.uploadPart(uploadPartRequest,
+                                RequestBody.fromInputStream(inputStream, 
nextPartSize));
+                        synchronized (parts) {
+                            
parts.add(CompletedPart.builder().partNumber(partNumConst).eTag(uploadPartResponse.eTag())
+                                    .build());
+                        }
+                        LOG.debug("finish multi part upload for num: {}", 
partNumConst);
+                    } catch (Exception e) {
+                        LOG.warn("Failed to multi part upload for num: {}", 
partNumConst, e);
+                        failed.set(true);
+                    } finally {
+                        latch.countDown();
+                    }
+                });
+                totalUploaded += nextPartSize;
+                partNum++;
+            }
+            for (int i = 0; i < Config.multi_part_upload_max_seconds / 10; 
i++) {

Review Comment:
   LOG more details here, how long it has been waiting
   it seems does not make sense  i < config.multi...seconds / 10, however latch 
wait interval is 10 sec 
   
   we may need to accumulate time waited by the latch and check if the sum 
exceeds Config.multi_part_upload_max_seconds



##########
fe/fe-common/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -3630,4 +3630,10 @@ public static int metaServiceRpcRetryTimes() {
     public static long cloud_auto_snapshot_max_reversed_num = 35;
     @ConfField(mutable = true)
     public static long cloud_auto_snapshot_min_interval_seconds = 3600;
+    @ConfField(mutable = true)
+    public static long multi_part_upload_part_size_in_bytes = 512 * 1024 * 
1024L; // 512MB
+    @ConfField(mutable = true)
+    public static int multi_part_upload_max_seconds = 1200; // 20 minutes

Review Comment:
   make it to several hours if it is the whole procedure of "begin upload 
complete"



##########
fe/fe-core/src/main/java/org/apache/doris/cloud/storage/DefaultRemote.java:
##########
@@ -190,6 +211,97 @@ public void putObject(File file, String key) throws 
DdlException {
         }
     }
 
+    @Override
+    public void multiPartUploadObject(File file, String key) throws 
DdlException {
+        long fileSize = file.length();
+        if (fileSize <= Config.multi_part_upload_part_size_in_bytes) {
+            putObject(file, key);
+            return;
+        }
+
+        long start = System.currentTimeMillis();
+        initClient();
+        initPool();
+        // create multipart upload
+        CreateMultipartUploadRequest createMultipartUploadRequest = 
CreateMultipartUploadRequest.builder()
+                .bucket(obj.getBucket()).key(key).build();
+        CreateMultipartUploadResponse multipartUpload = 
s3Client.createMultipartUpload(
+                createMultipartUploadRequest);
+        String uploadId = multipartUpload.uploadId();
+
+        // calculate part size
+        long partSize = Config.multi_part_upload_part_size_in_bytes;
+        if (partSize * MULTI_PART_UPLOAD_MAX_PART_NUM < fileSize) {
+            partSize = (fileSize + MULTI_PART_UPLOAD_MAX_PART_NUM - 1) / 
MULTI_PART_UPLOAD_MAX_PART_NUM;
+        }
+        int totalPartNum = (int) (fileSize / partSize) + (fileSize % partSize 
== 0 ? 0 : 1);
+        LOG.info("multi part upload file: {}, size: {}, part size: {}, total 
part num: {}",
+                file.getAbsolutePath(), fileSize, partSize, totalPartNum);
+
+        try (FileInputStream inputStream = FileUtils.openInputStream(file)) {
+            List<CompletedPart> parts = new ArrayList<>();
+            CountDownLatch latch = new CountDownLatch(totalPartNum);
+            int partNum = 1;
+            long totalUploaded = 0;
+            AtomicBoolean failed = new AtomicBoolean(false);
+
+            while (totalUploaded < fileSize && !failed.get()) {
+                long nextPartSize = Math.min(partSize, fileSize - 
totalUploaded);
+                int partNumConst = partNum;
+                POOL.submit(() -> {
+                    if (failed.get()) {
+                        return;
+                    }
+                    LOG.debug("start multi part upload for num: {}", 
partNumConst);
+                    UploadPartRequest uploadPartRequest = 
UploadPartRequest.builder().bucket(obj.getBucket()).key(key)
+                            
.uploadId(uploadId).partNumber(partNumConst).build();
+                    try {
+                        UploadPartResponse uploadPartResponse = 
s3Client.uploadPart(uploadPartRequest,
+                                RequestBody.fromInputStream(inputStream, 
nextPartSize));
+                        synchronized (parts) {
+                            
parts.add(CompletedPart.builder().partNumber(partNumConst).eTag(uploadPartResponse.eTag())
+                                    .build());
+                        }
+                        LOG.debug("finish multi part upload for num: {}", 
partNumConst);
+                    } catch (Exception e) {
+                        LOG.warn("Failed to multi part upload for num: {}", 
partNumConst, e);

Review Comment:
   print all info of the object, e.g. path size  upload id ect ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to