[ 
https://issues.apache.org/jira/browse/CAMEL-11698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16138270#comment-16138270
 ] 

Andrea Cosentino commented on CAMEL-11698:
------------------------------------------

We add this in the past 

https://issues.apache.org/jira/browse/CAMEL-11697

With the autocloseBody option true we close the s3Object when the exchange is 
complete:

{code}
    public Exchange createExchange(ExchangePattern pattern, S3Object s3Object) {
        LOG.trace("Getting object with key [{}] from bucket [{}]...", 
s3Object.getKey(), s3Object.getBucketName());

        ObjectMetadata objectMetadata = s3Object.getObjectMetadata();

        LOG.trace("Got object [{}]", s3Object);

        Exchange exchange = super.createExchange(pattern);
        Message message = exchange.getIn();

        if (configuration.isIncludeBody()) {
            message.setBody(s3Object.getObjectContent());
        } else {
            message.setBody(null);
        }

        message.setHeader(S3Constants.KEY, s3Object.getKey());
        message.setHeader(S3Constants.BUCKET_NAME, s3Object.getBucketName());
        message.setHeader(S3Constants.E_TAG, objectMetadata.getETag());
        message.setHeader(S3Constants.LAST_MODIFIED, 
objectMetadata.getLastModified());
        message.setHeader(S3Constants.VERSION_ID, 
objectMetadata.getVersionId());
        message.setHeader(S3Constants.CONTENT_TYPE, 
objectMetadata.getContentType());
        message.setHeader(S3Constants.CONTENT_MD5, 
objectMetadata.getContentMD5());
        message.setHeader(S3Constants.CONTENT_LENGTH, 
objectMetadata.getContentLength());
        message.setHeader(S3Constants.CONTENT_ENCODING, 
objectMetadata.getContentEncoding());
        message.setHeader(S3Constants.CONTENT_DISPOSITION, 
objectMetadata.getContentDisposition());
        message.setHeader(S3Constants.CACHE_CONTROL, 
objectMetadata.getCacheControl());
        message.setHeader(S3Constants.S3_HEADERS, 
objectMetadata.getRawMetadata());
        message.setHeader(S3Constants.SERVER_SIDE_ENCRYPTION, 
objectMetadata.getSSEAlgorithm());

        /**
         * If includeBody != true, it is safe to close the object here.  If 
includeBody == true,
         * the caller is responsible for closing the stream and object once the 
body has been fully consumed.
         * As of 2.17, the consumer does not close the stream or object on 
commit.
         */
        if (!configuration.isIncludeBody()) {
            try {
                s3Object.close();
            } catch (IOException e) {
            }
        } else {
            if (configuration.isAutocloseBody()) {
                exchange.addOnCompletion(new SynchronizationAdapter() {
                    @Override
                    public void onDone(Exchange exchange) {
                        try {
                            s3Object.close();
                        } catch (IOException e) {
                        }
                    }
                });
            }
        }

        return exchange;
{code}

We need to improve error handling somewhere by the way

> S3 Consumer does not close S3 Object Input Streams and this causes HTTP 
> connection leaks
> ----------------------------------------------------------------------------------------
>
>                 Key: CAMEL-11698
>                 URL: https://issues.apache.org/jira/browse/CAMEL-11698
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-aws
>    Affects Versions: 2.14.3, 2.19.2
>            Reporter: MykhailoVlakh
>            Assignee: Andrea Cosentino
>         Attachments: CustomS3Consumer.java
>
>
> It looks like S3Consumer does nothing to prevent HTTP Connection leaks that 
> can easily happen if some exception is thrown while it is generates a batch 
> of exchanges and sends them for processing. Also we can lose HTTP Connections 
> if our route does not close S3 Object Input Streams which can easily happen.
> Due to this issue s3 consumer may works some time and then start failing with 
> the following exceptions:
> {code}
> com.amazonaws.AmazonClientException: Unable to execute HTTP request: Timeout 
> waiting for connection from pool
>       at 
> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:544)
>       at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:273)
>       at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3660)
>       at 
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1133)
>       at 
> com.amazonaws.services.s3.AmazonS3EncryptionClient.access$201(AmazonS3EncryptionClient.java:65)
>       at 
> com.amazonaws.services.s3.AmazonS3EncryptionClient$S3DirectImpl.getObject(AmazonS3EncryptionClient.java:524)
>       at 
> com.amazonaws.services.s3.internal.crypto.S3CryptoModuleAE.getObjectSecurely(S3CryptoModuleAE.java:106)
>       at 
> com.amazonaws.services.s3.internal.crypto.CryptoModuleDispatcher.getObjectSecurely(CryptoModuleDispatcher.java:114)
>       at 
> com.amazonaws.services.s3.AmazonS3EncryptionClient.getObject(AmazonS3EncryptionClient.java:427)
>       at 
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1005)
>       at 
> org.apache.camel.component.aws.s3.S3Consumer.createExchanges(S3Consumer.java:112)
>       at org.apache.camel.component.aws.s3.S3Consumer.poll(S3Consumer.java:93)
>       at 
> org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:187)
>       at 
> org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:114)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> I found 3 week points in the way S3Consumer is implemented:
> 1. It does not handle exceptions in the poll() method where it reads a single 
> s3 object which means that s3 object stream can be left opened forever in 
> case of some fault;
> 2. It also does not handle exceptions in the createExchanges method where it 
> populates a list of exchanges based on the list of s3 objects available in 
> the bucket. If for example we want to consumer 10 files in a pool and 
> getObject call for the file 10 failed due to whatever reason steams for 9 
> objects that are already opened will be lost;
> 3. In order to make sure that we always close all the streams and to not 
> force user to do this all the time the implementation of the processBatch 
> method should be also improved to close all the opened streams in the finally 
> block.
> In order to resolve issues 2 and 3 in my current project (the issue 1 is not 
> affecting me because I do not use that feature) I implemented a custom 
> extension of the native S3Consumer that I want to share with you. It will 
> give you the idea of the required changes that need to be applied to fix 
> these issues. I hope it will be useful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to