This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c0f45fb324398da9307344496c098d65ba2de178
Author: Andrea Cosentino <[email protected]>
AuthorDate: Fri Apr 2 19:22:27 2021 +0200

    CAMEL-16185 - AWS S3: improve multipart support - streaming upload
---
 .../aws2/s3/stream/AWS2S3StreamUploadProducer.java | 31 ++++++++++++++++++++--
 .../S3StreamUploadOperationLocalstackTest.java     |  1 +
 2 files changed, 30 insertions(+), 2 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
index 4a4785f..32c6f28 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/stream/AWS2S3StreamUploadProducer.java
@@ -21,6 +21,8 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.Endpoint;
@@ -65,6 +67,29 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
     UUID id;
     String dynamicKeyName;
     private transient String s3ProducerToString;
+    private ScheduledExecutorService timeoutCheckerExecutorService;
+    private boolean timeout;
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        timeoutCheckerExecutorService
+                = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
+                        "timeout_checker");
+        timeoutCheckerExecutorService.scheduleAtFixedRate(new 
AggregationIntervalTask(), 1, 1, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Background task that triggers completion based on interval.
+     */
+    private final class AggregationIntervalTask implements Runnable {
+
+        @Override
+        public void run() {
+            timeout = true;
+            LOG.info("timeout triggered");
+        }
+    }
 
     public AWS2S3StreamUploadProducer(final Endpoint endpoint) {
         super(endpoint);
@@ -132,6 +157,7 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
         try {
             if (buffer.size() >= getConfiguration().getBatchSize()
                     || index.get() == 
getConfiguration().getBatchMessageNumber()) {
+                LOG.info("Timeout " + timeout);
 
                 UploadPartRequest uploadRequest = 
UploadPartRequest.builder().bucket(getConfiguration().getBucketName())
                         .key(dynamicKeyName).uploadId(initResponse.uploadId())
@@ -147,7 +173,7 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
                 part.getAndIncrement();
             }
 
-            if (index.get() == getConfiguration().getBatchMessageNumber()) {
+            if (index.get() == getConfiguration().getBatchMessageNumber() || 
timeout) {
                 CompletedMultipartUpload completeMultipartUpload
                         = 
CompletedMultipartUpload.builder().parts(completedParts).build();
                 CompleteMultipartUploadRequest compRequest
@@ -157,13 +183,14 @@ public class AWS2S3StreamUploadProducer extends 
DefaultProducer {
                                 .build();
 
                 CompleteMultipartUploadResponse uploadResult = 
getEndpoint().getS3Client().completeMultipartUpload(compRequest);
-                LOG.trace("Completed upload for the part {} with etag {} at 
index {}", part, uploadResult.eTag(),
+                LOG.info("Completed upload for the part {} with etag {} at 
index {}", part, uploadResult.eTag(),
                         index);
                 Message message = getMessageForResponse(exchange);
                 message.setHeader(AWS2S3Constants.E_TAG, uploadResult.eTag());
                 if (uploadResult.versionId() != null) {
                     message.setHeader(AWS2S3Constants.VERSION_ID, 
uploadResult.versionId());
                 }
+                timeout = false;
                 index.getAndSet(0);
             }
 
diff --git 
a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
 
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
index 87cca16..6a37b9ad 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/test/java/org/apache/camel/component/aws2/s3/localstack/S3StreamUploadOperationLocalstackTest.java
@@ -57,6 +57,7 @@ public class S3StreamUploadOperationLocalstackTest extends 
Aws2S3BaseTest {
             }
         });
 
+        Thread.sleep(30000);
         List<S3Object> resp = ex.getMessage().getBody(List.class);
         assertEquals(40, resp.size());
     }

Reply via email to