[
https://issues.apache.org/jira/browse/CAMEL-11698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16139900#comment-16139900
]
Steve Loughran commented on CAMEL-11698:
----------------------------------------
Don't know if its relevant here, but in Hadoop s3a we decide whether to abort
or close the connection based on the amount of remaining data. In close() then
AWS s3 client will read() to the end of the data, to recycle the HTTP1.1
connection. This is OK for small files, but not multi-GB files where you are
closing things to jump around in seek
> S3 Consumer does not close S3 Object Input Streams and this causes HTTP
> connection leaks
> ----------------------------------------------------------------------------------------
>
> Key: CAMEL-11698
> URL: https://issues.apache.org/jira/browse/CAMEL-11698
> Project: Camel
> Issue Type: Bug
> Components: camel-aws
> Affects Versions: 2.14.3, 2.19.2
> Reporter: MykhailoVlakh
> Assignee: Andrea Cosentino
> Attachments: CustomS3Consumer.java
>
>
> It looks like S3Consumer does nothing to prevent HTTP Connection leaks that
> can easily happen if some exception is thrown while it is generates a batch
> of exchanges and sends them for processing. Also we can lose HTTP Connections
> if our route does not close S3 Object Input Streams which can easily happen.
> Due to this issue s3 consumer may works some time and then start failing with
> the following exceptions:
> {code}
> com.amazonaws.AmazonClientException: Unable to execute HTTP request: Timeout
> waiting for connection from pool
> at
> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:544)
> at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:273)
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3660)
> at
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1133)
> at
> com.amazonaws.services.s3.AmazonS3EncryptionClient.access$201(AmazonS3EncryptionClient.java:65)
> at
> com.amazonaws.services.s3.AmazonS3EncryptionClient$S3DirectImpl.getObject(AmazonS3EncryptionClient.java:524)
> at
> com.amazonaws.services.s3.internal.crypto.S3CryptoModuleAE.getObjectSecurely(S3CryptoModuleAE.java:106)
> at
> com.amazonaws.services.s3.internal.crypto.CryptoModuleDispatcher.getObjectSecurely(CryptoModuleDispatcher.java:114)
> at
> com.amazonaws.services.s3.AmazonS3EncryptionClient.getObject(AmazonS3EncryptionClient.java:427)
> at
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1005)
> at
> org.apache.camel.component.aws.s3.S3Consumer.createExchanges(S3Consumer.java:112)
> at org.apache.camel.component.aws.s3.S3Consumer.poll(S3Consumer.java:93)
> at
> org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:187)
> at
> org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:114)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> I found 3 week points in the way S3Consumer is implemented:
> 1. It does not handle exceptions in the poll() method where it reads a single
> s3 object which means that s3 object stream can be left opened forever in
> case of some fault;
> 2. It also does not handle exceptions in the createExchanges method where it
> populates a list of exchanges based on the list of s3 objects available in
> the bucket. If for example we want to consumer 10 files in a pool and
> getObject call for the file 10 failed due to whatever reason steams for 9
> objects that are already opened will be lost;
> 3. In order to make sure that we always close all the streams and to not
> force user to do this all the time the implementation of the processBatch
> method should be also improved to close all the opened streams in the finally
> block.
> In order to resolve issues 2 and 3 in my current project (the issue 1 is not
> affecting me because I do not use that feature) I implemented a custom
> extension of the native S3Consumer that I want to share with you. It will
> give you the idea of the required changes that need to be applied to fix
> these issues. I hope it will be useful.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)