This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 173049e4ab27fd983647339e62cfb539ecb72e0b
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Sep 30 15:28:50 2021 +0200

    CAMEL-17024: camel-aws2-s3 - Determine content-length in a smarter way
---
 .../camel/component/aws2/s3/AWS2S3Producer.java    | 105 ++++++++++-----------
 .../camel/component/aws2/s3/utils/AWS2S3Utils.java |  28 ++++--
 2 files changed, 72 insertions(+), 61 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java
 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java
index 95f6c6b..8427993 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java
@@ -37,7 +37,6 @@ import org.apache.camel.support.DefaultProducer;
 import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
@@ -78,14 +77,11 @@ import 
software.amazon.awssdk.services.s3.presigner.model.PresignedGetObjectRequ
 /**
  * A Producer which sends messages to the Amazon Web Service Simple Storage 
Service
  * <a href="http://aws.amazon.com/s3/";>AWS S3</a>
- *
  */
 public class AWS2S3Producer extends DefaultProducer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AWS2S3Producer.class);
 
-    private transient String s3ProducerToString;
-
     public AWS2S3Producer(final Endpoint endpoint) {
         super(endpoint);
     }
@@ -252,38 +248,59 @@ public class AWS2S3Producer extends DefaultProducer {
     }
 
     public void processSingleOp(final Exchange exchange) throws Exception {
+        PutObjectRequest.Builder putObjectRequest = PutObjectRequest.builder();
 
         Map<String, String> objectMetadata = determineMetadata(exchange);
 
-        File filePayload = null;
-        InputStream is = null;
-        ByteArrayOutputStream baos = null;
+        // the content-length may already be known
+        long contentLength = 
Long.parseLong(objectMetadata.getOrDefault(Exchange.CONTENT_LENGTH, "-1"));
+
         Object obj = exchange.getIn().getMandatoryBody();
-        PutObjectRequest.Builder putObjectRequest = PutObjectRequest.builder();
-        // Need to check if the message body is WrappedFile
-        if (obj instanceof WrappedFile) {
-            obj = ((WrappedFile<?>) obj).getFile();
-        }
-        if (obj instanceof File) {
-            filePayload = (File) obj;
-            is = new FileInputStream(filePayload);
-        } else {
-            is = exchange.getIn().getMandatoryBody(InputStream.class);
-            if (objectMetadata.containsKey(Exchange.CONTENT_LENGTH)) {
-                if (objectMetadata.get("Content-Length").equals("0")
-                        && 
ObjectHelper.isEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
-                    LOG.debug("The content length is not defined. It needs to 
be determined by reading the data into memory");
-                    baos = AWS2S3Utils.determineLengthInputStream(is);
-                    objectMetadata.put("Content-Length", 
String.valueOf(baos.size()));
-                    is = new ByteArrayInputStream(baos.toByteArray());
-                } else {
-                    if 
(ObjectHelper.isNotEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
-                        objectMetadata.put("Content-Length", 
exchange.getProperty(Exchange.CONTENT_LENGTH, String.class));
+        InputStream inputStream = null;
+        File filePayload = null;
+
+        try {
+            // Need to check if the message body is WrappedFile
+            if (obj instanceof WrappedFile) {
+                obj = ((WrappedFile<?>) obj).getFile();
+            }
+            if (obj instanceof File) {
+                filePayload = (File) obj;
+                inputStream = new FileInputStream(filePayload);
+                contentLength = filePayload.length();
+            } else {
+                inputStream = 
exchange.getIn().getMandatoryBody(InputStream.class);
+                if (contentLength <= 0) {
+                    contentLength = 
AWS2S3Utils.determineLengthInputStream(inputStream);
+                    if (contentLength == -1) {
+                        // fallback to read into memory to calculate length
+                        LOG.debug(
+                                "The content length is not defined. It needs 
to be determined by reading the data into memory");
+                        ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                        IOHelper.copyAndCloseInput(inputStream, baos);
+                        byte[] arr = baos.toByteArray();
+                        contentLength = arr.length;
+                        inputStream = new ByteArrayInputStream(arr);
                     }
                 }
+                if (contentLength > 0) {
+                    objectMetadata.put(Exchange.CONTENT_LENGTH, 
String.valueOf(contentLength));
+                }
             }
+
+            doPutObject(exchange, putObjectRequest, objectMetadata, 
inputStream);
+        } finally {
+            IOHelper.close(inputStream);
         }
 
+        if (getConfiguration().isDeleteAfterWrite() && filePayload != null) {
+            FileUtil.deleteFile(filePayload);
+        }
+    }
+
+    private void doPutObject(
+            Exchange exchange, PutObjectRequest.Builder putObjectRequest, 
Map<String, String> objectMetadata,
+            InputStream inputStream) {
         final String bucketName = AWS2S3Utils.determineBucketName(exchange, 
getConfiguration());
         final String key = AWS2S3Utils.determineKey(exchange, 
getConfiguration());
         putObjectRequest.bucket(bucketName).key(key).metadata(objectMetadata);
@@ -334,7 +351,7 @@ public class AWS2S3Producer extends DefaultProducer {
         LOG.trace("Put object [{}] from exchange [{}]...", putObjectRequest, 
exchange);
 
         PutObjectResponse putObjectResult = 
getEndpoint().getS3Client().putObject(putObjectRequest.build(),
-                
RequestBody.fromBytes(SdkBytes.fromInputStream(is).asByteArray()));
+                
RequestBody.fromBytes(SdkBytes.fromInputStream(inputStream).asByteArray()));
 
         LOG.trace("Received result [{}]", putObjectResult);
 
@@ -343,12 +360,6 @@ public class AWS2S3Producer extends DefaultProducer {
         if (putObjectResult.versionId() != null) {
             message.setHeader(AWS2S3Constants.VERSION_ID, 
putObjectResult.versionId());
         }
-
-        IOHelper.close(is);
-
-        if (getConfiguration().isDeleteAfterWrite() && filePayload != null) {
-            FileUtil.deleteFile(filePayload);
-        }
     }
 
     private void copyObject(S3Client s3Client, Exchange exchange) throws 
InvalidPayloadException {
@@ -406,6 +417,7 @@ public class AWS2S3Producer extends DefaultProducer {
     private void deleteObject(S3Client s3Client, Exchange exchange) throws 
InvalidPayloadException {
         final String bucketName = AWS2S3Utils.determineBucketName(exchange, 
getConfiguration());
         final String sourceKey = AWS2S3Utils.determineKey(exchange, 
getConfiguration());
+
         if (getConfiguration().isPojoRequest()) {
             Object payload = exchange.getIn().getMandatoryBody();
             if (payload instanceof DeleteObjectRequest) {
@@ -414,7 +426,6 @@ public class AWS2S3Producer extends DefaultProducer {
                 message.setBody(true);
             }
         } else {
-
             DeleteObjectRequest.Builder deleteObjectRequest = 
DeleteObjectRequest.builder().bucket(bucketName).key(sourceKey);
             s3Client.deleteObject(deleteObjectRequest.build());
 
@@ -441,7 +452,6 @@ public class AWS2S3Producer extends DefaultProducer {
                 message.setBody(resp);
             }
         } else {
-
             DeleteBucketRequest.Builder deleteBucketRequest = 
DeleteBucketRequest.builder().bucket(bucketName);
             DeleteBucketResponse resp = 
s3Client.deleteBucket(deleteBucketRequest.build());
 
@@ -451,7 +461,6 @@ public class AWS2S3Producer extends DefaultProducer {
     }
 
     private void getObject(S3Client s3Client, Exchange exchange) throws 
InvalidPayloadException {
-
         if (getConfiguration().isPojoRequest()) {
             Object payload = exchange.getIn().getMandatoryBody();
             if (payload instanceof GetObjectRequest) {
@@ -486,7 +495,6 @@ public class AWS2S3Producer extends DefaultProducer {
                 message.setBody(res);
             }
         } else {
-
             if (ObjectHelper.isEmpty(rangeStart) || 
ObjectHelper.isEmpty(rangeEnd)) {
                 throw new IllegalArgumentException(
                         "A Range start and range end header must be configured 
to perform a range get operation.");
@@ -512,7 +520,6 @@ public class AWS2S3Producer extends DefaultProducer {
                 message.setBody(objectList.contents());
             }
         } else {
-
             ListObjectsResponse objectList = 
s3Client.listObjects(ListObjectsRequest.builder().bucket(bucketName).build());
 
             Message message = getMessageForResponse(exchange);
@@ -576,32 +583,32 @@ public class AWS2S3Producer extends DefaultProducer {
 
         Long contentLength = 
exchange.getIn().getHeader(AWS2S3Constants.CONTENT_LENGTH, Long.class);
         if (contentLength != null) {
-            objectMetadata.put("Content-Length", 
String.valueOf(contentLength));
+            objectMetadata.put(Exchange.CONTENT_LENGTH, 
String.valueOf(contentLength));
         }
 
         String contentType = 
exchange.getIn().getHeader(AWS2S3Constants.CONTENT_TYPE, String.class);
         if (contentType != null) {
-            objectMetadata.put("Content-Type", String.valueOf(contentType));
+            objectMetadata.put("Content-Type", contentType);
         }
 
         String cacheControl = 
exchange.getIn().getHeader(AWS2S3Constants.CACHE_CONTROL, String.class);
         if (cacheControl != null) {
-            objectMetadata.put("Cache-Control", String.valueOf(cacheControl));
+            objectMetadata.put("Cache-Control", cacheControl);
         }
 
         String contentDisposition = 
exchange.getIn().getHeader(AWS2S3Constants.CONTENT_DISPOSITION, String.class);
         if (contentDisposition != null) {
-            objectMetadata.put("Content-Disposition", 
String.valueOf(contentDisposition));
+            objectMetadata.put("Content-Disposition", contentDisposition);
         }
 
         String contentEncoding = 
exchange.getIn().getHeader(AWS2S3Constants.CONTENT_ENCODING, String.class);
         if (contentEncoding != null) {
-            objectMetadata.put("Content-Encoding", 
String.valueOf(contentEncoding));
+            objectMetadata.put("Content-Encoding", contentEncoding);
         }
 
         String contentMD5 = 
exchange.getIn().getHeader(AWS2S3Constants.CONTENT_MD5, String.class);
         if (contentMD5 != null) {
-            objectMetadata.put("Content-Md5", String.valueOf(contentMD5));
+            objectMetadata.put("Content-Md5", contentMD5);
         }
 
         return objectMetadata;
@@ -612,14 +619,6 @@ public class AWS2S3Producer extends DefaultProducer {
     }
 
     @Override
-    public String toString() {
-        if (s3ProducerToString == null) {
-            s3ProducerToString = "S3Producer[" + 
URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
-        }
-        return s3ProducerToString;
-    }
-
-    @Override
     public AWS2S3Endpoint getEndpoint() {
         return (AWS2S3Endpoint) super.getEndpoint();
     }
diff --git 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/utils/AWS2S3Utils.java
 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/utils/AWS2S3Utils.java
index 3f8ab3d..d434ff9 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/utils/AWS2S3Utils.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/utils/AWS2S3Utils.java
@@ -16,7 +16,7 @@
  */
 package org.apache.camel.component.aws2.s3.utils;
 
-import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -80,14 +80,26 @@ public final class AWS2S3Utils {
         }
     }
 
-    public static ByteArrayOutputStream determineLengthInputStream(InputStream 
is) throws IOException {
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        byte[] bytes = new byte[1024];
-        int count;
-        while ((count = is.read(bytes)) > 0) {
-            out.write(bytes, 0, count);
+    public static long determineLengthInputStream(InputStream is) throws 
IOException {
+        if (!is.markSupported()) {
+            return -1;
         }
-        return out;
+        if (is instanceof ByteArrayInputStream) {
+            return is.available();
+        }
+        long size = 0;
+        try {
+            is.mark(1024);
+            int i = is.available();
+            while (i > 0) {
+                long skip = is.skip(i);
+                size += skip;
+                i = is.available();
+            }
+        } finally {
+            is.reset();
+        }
+        return size;
     }
 
     public static String determineKey(final Exchange exchange, 
AWS2S3Configuration configuration) {

Reply via email to