[
https://issues.apache.org/jira/browse/CAMEL-11698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16138270#comment-16138270
]
Andrea Cosentino commented on CAMEL-11698:
------------------------------------------
We add this in the past
https://issues.apache.org/jira/browse/CAMEL-11697
With the autocloseBody option true we close the s3Object when the exchange is
complete:
{code}
public Exchange createExchange(ExchangePattern pattern, S3Object s3Object) {
LOG.trace("Getting object with key [{}] from bucket [{}]...",
s3Object.getKey(), s3Object.getBucketName());
ObjectMetadata objectMetadata = s3Object.getObjectMetadata();
LOG.trace("Got object [{}]", s3Object);
Exchange exchange = super.createExchange(pattern);
Message message = exchange.getIn();
if (configuration.isIncludeBody()) {
message.setBody(s3Object.getObjectContent());
} else {
message.setBody(null);
}
message.setHeader(S3Constants.KEY, s3Object.getKey());
message.setHeader(S3Constants.BUCKET_NAME, s3Object.getBucketName());
message.setHeader(S3Constants.E_TAG, objectMetadata.getETag());
message.setHeader(S3Constants.LAST_MODIFIED,
objectMetadata.getLastModified());
message.setHeader(S3Constants.VERSION_ID,
objectMetadata.getVersionId());
message.setHeader(S3Constants.CONTENT_TYPE,
objectMetadata.getContentType());
message.setHeader(S3Constants.CONTENT_MD5,
objectMetadata.getContentMD5());
message.setHeader(S3Constants.CONTENT_LENGTH,
objectMetadata.getContentLength());
message.setHeader(S3Constants.CONTENT_ENCODING,
objectMetadata.getContentEncoding());
message.setHeader(S3Constants.CONTENT_DISPOSITION,
objectMetadata.getContentDisposition());
message.setHeader(S3Constants.CACHE_CONTROL,
objectMetadata.getCacheControl());
message.setHeader(S3Constants.S3_HEADERS,
objectMetadata.getRawMetadata());
message.setHeader(S3Constants.SERVER_SIDE_ENCRYPTION,
objectMetadata.getSSEAlgorithm());
/**
* If includeBody != true, it is safe to close the object here. If
includeBody == true,
* the caller is responsible for closing the stream and object once the
body has been fully consumed.
* As of 2.17, the consumer does not close the stream or object on
commit.
*/
if (!configuration.isIncludeBody()) {
try {
s3Object.close();
} catch (IOException e) {
}
} else {
if (configuration.isAutocloseBody()) {
exchange.addOnCompletion(new SynchronizationAdapter() {
@Override
public void onDone(Exchange exchange) {
try {
s3Object.close();
} catch (IOException e) {
}
}
});
}
}
return exchange;
{code}
We need to improve error handling somewhere by the way
> S3 Consumer does not close S3 Object Input Streams and this causes HTTP
> connection leaks
> ----------------------------------------------------------------------------------------
>
> Key: CAMEL-11698
> URL: https://issues.apache.org/jira/browse/CAMEL-11698
> Project: Camel
> Issue Type: Bug
> Components: camel-aws
> Affects Versions: 2.14.3, 2.19.2
> Reporter: MykhailoVlakh
> Assignee: Andrea Cosentino
> Attachments: CustomS3Consumer.java
>
>
> It looks like S3Consumer does nothing to prevent HTTP Connection leaks that
> can easily happen if some exception is thrown while it is generates a batch
> of exchanges and sends them for processing. Also we can lose HTTP Connections
> if our route does not close S3 Object Input Streams which can easily happen.
> Due to this issue s3 consumer may works some time and then start failing with
> the following exceptions:
> {code}
> com.amazonaws.AmazonClientException: Unable to execute HTTP request: Timeout
> waiting for connection from pool
> at
> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:544)
> at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:273)
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3660)
> at
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1133)
> at
> com.amazonaws.services.s3.AmazonS3EncryptionClient.access$201(AmazonS3EncryptionClient.java:65)
> at
> com.amazonaws.services.s3.AmazonS3EncryptionClient$S3DirectImpl.getObject(AmazonS3EncryptionClient.java:524)
> at
> com.amazonaws.services.s3.internal.crypto.S3CryptoModuleAE.getObjectSecurely(S3CryptoModuleAE.java:106)
> at
> com.amazonaws.services.s3.internal.crypto.CryptoModuleDispatcher.getObjectSecurely(CryptoModuleDispatcher.java:114)
> at
> com.amazonaws.services.s3.AmazonS3EncryptionClient.getObject(AmazonS3EncryptionClient.java:427)
> at
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1005)
> at
> org.apache.camel.component.aws.s3.S3Consumer.createExchanges(S3Consumer.java:112)
> at org.apache.camel.component.aws.s3.S3Consumer.poll(S3Consumer.java:93)
> at
> org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:187)
> at
> org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:114)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> I found 3 week points in the way S3Consumer is implemented:
> 1. It does not handle exceptions in the poll() method where it reads a single
> s3 object which means that s3 object stream can be left opened forever in
> case of some fault;
> 2. It also does not handle exceptions in the createExchanges method where it
> populates a list of exchanges based on the list of s3 objects available in
> the bucket. If for example we want to consumer 10 files in a pool and
> getObject call for the file 10 failed due to whatever reason steams for 9
> objects that are already opened will be lost;
> 3. In order to make sure that we always close all the streams and to not
> force user to do this all the time the implementation of the processBatch
> method should be also improved to close all the opened streams in the finally
> block.
> In order to resolve issues 2 and 3 in my current project (the issue 1 is not
> affecting me because I do not use that feature) I implemented a custom
> extension of the native S3Consumer that I want to share with you. It will
> give you the idea of the required changes that need to be applied to fix
> these issues. I hope it will be useful.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)