[ 
https://issues.apache.org/jira/browse/CAMEL-12244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen updated CAMEL-12244:
--------------------------------
    Fix Version/s: 2.21.0

> RemoteFileProducer stopped instead of being released to the pool when 
> "interceptSendToEndpoint" is used
> -------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-12244
>                 URL: https://issues.apache.org/jira/browse/CAMEL-12244
>             Project: Camel
>          Issue Type: Bug
>    Affects Versions: 2.19.0
>            Reporter: Krzysztof SzafraƄski
>            Priority: Major
>             Fix For: 2.21.0
>
>
> In our application we're using an SFTP producer with "fileExist=Move" and a 
> specific "moveExisting" expression. I encountered a problem where this would 
> sometimes work, and sometimes not (i.e. there would be no ".archived" file). 
> Upon further investigation I found the problem and it seems to be a bug in 
> Camel.
> Our SFTP endpoint looks like this:
> {code:none}
> sftp://...:.../...?username=...&privateKeyPassphrase=...&privateKeyFile=...&useUserKnownHostsFile=false&jschLoggingLevel=ERROR&fileExist=Move&moveExisting=${file:name}.archived${date:now:yyyyMMddHHmmssSSS}
> {code}
> We also have an interceptor:
> {code:none}
> route.interceptSendToEndpoint("sftp://.*";).process(exchange -> 
> LOG.info("Sending file {} to {}", ...));
> {code}
> As I discovered, using the interceptor wraps the RemoteFileProducer with 
> InterceptSendToEndpoint. This however changes the behavior of the 
> ProducerCache:
> {code}
> public boolean doInAsyncProducer(...) {
>     ...
>     return producerCallback.doInAsyncProducer(producer, asyncProcessor, 
> exchange, pattern, doneSync -> {
>         ...
>         if (producer instanceof ServicePoolAware) {
>             // release back to the pool
>             pool.release(endpoint, producer);
>         } else if (!producer.isSingleton()) {
>             // stop and shutdown non-singleton producers as we should not 
> leak resources
>             try {
>                 ServiceHelper.stopAndShutdownService(producer);
>             } catch (Exception e) {
>                 ...
>             }
>         }
>         ...
>     });
>     ...
> }
> {code}
> RemoteFileProducer implements ServicePoolAware so it would normally go back 
> to the pool, but InterceptSendToEndpoint _does not_. As a result, our 
> producers keep getting stopped (note that RemoteFileProducer#isSingleton 
> always returns false).
> What's more, somehow they _are_ being reused and in the end we run into 
> situations, where one thread is closing a producer, while another thread is 
> trying to write with it.
> I set up some breakpoints that log the thread name and 
> System#identityHashCode of the producer:
> {code}
> 2018-02-08 15:05:25.070 TRACE o.a.c.c.file.remote.RemoteFileProducer     : 
> Starting producer: RemoteFileProducer[...]
> 2018-02-08 15:05:25.073 TRACE o.a.c.c.file.remote.RemoteFileProducer     : 
> Processing file: [my_file] for exchange: ...
> 2018-02-08 15:05:25.073 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : 
> Not already connected/logged in. Connecting to: ...
> doStop(), time: 1518098725112,  thread [Camel (camel-1) thread #35 - 
> CamelInvocationHandler], producer: 889747012
>       at 
> org.apache.camel.component.file.remote.RemoteFileProducer.doStop(RemoteFileProducer.java:175)
>       at org.apache.camel.support.ServiceSupport.stop(ServiceSupport.java:102)
>       at 
> org.apache.camel.util.ServiceHelper.stopService(ServiceHelper.java:142)
>       at 
> org.apache.camel.impl.InterceptSendToEndpoint$1.stop(InterceptSendToEndpoint.java:196)
>       at 
> org.apache.camel.support.ServiceSupport.shutdown(ServiceSupport.java:164)
>       at 
> org.apache.camel.util.ServiceHelper.stopAndShutdownService(ServiceHelper.java:211)
>       at 
> org.apache.camel.impl.ProducerCache.lambda$doInAsyncProducer$2(ProducerCache.java:450)
>       at 
> org.apache.camel.processor.SendProcessor$2$1.done(SendProcessor.java:178)
>       at 
> org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:171)
>       at 
> org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173)
>       at 
> org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436)
>       at 
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168)
>       at 
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
>       at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
>       at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
>       at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
>       at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
>       at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
>       at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
>       at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
>       at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
>       at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
>       at 
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695)
>       at 
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623)
>       at 
> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247)
>       at org.apache.camel.processor.Splitter.process(Splitter.java:114)
>       at 
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
>       at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
>       at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
>       at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
>       at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
>       at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
>       at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
>       at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
>       at 
> org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47)
>       at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72)
>       at 
> org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192)
>       at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
>       at java.util.concurrent.FutureTask.run(FutureTask.java)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)
>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)
>       at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
>       at java.util.concurrent.FutureTask.run(FutureTask.java)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:748)
> 2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : 
> Connected and logged in to: ...
> 2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : 
> Disconnecting from: ...
> 2018-02-08 15:05:25.973 TRACE o.a.c.c.file.remote.RemoteFileProducer     : 
> About to write [my_file] to [...] from exchange [...]
> 2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer     : 
> Stopping producer: RemoteFileProducer[...]
> 2018-02-08 15:05:25.974 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : 
> Starting
> 2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer     : 
> Starting producer: RemoteFileProducer[...]
> 2018-02-08 15:05:25.977 TRACE o.a.c.c.file.remote.RemoteFileProducer     : 
> Processing file: [another_file] for exchange: Exchange[...]
> 2018-02-08 15:05:25.977 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : 
> Not already connected/logged in. Connecting to: ...
> handleFailedWrite(), time: 1518098726072, thread [Camel (camel-1) thread #37 
> - CamelInvocationHandler], producer: 889747012
>       at 
> org.apache.camel.component.file.remote.RemoteFileProducer.handleFailedWrite(RemoteFileProducer.java:81)
>       at 
> org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:227)
>       at 
> org.apache.camel.component.file.remote.RemoteFileProducer.process(RemoteFileProducer.java:58)
>       at 
> org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:167)
>       at 
> org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173)
>       at 
> org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436)
>       at 
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168)
>       at 
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
>       at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
>       at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
>       at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
>       at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
>       at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
>       at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
>       at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
>       at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
>       at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
>       at 
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695)
>       at 
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623)
>       at 
> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247)
>       at org.apache.camel.processor.Splitter.process(Splitter.java:114)
>       at 
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
>       at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
>       at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
>       at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
>       at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
>       at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
>       at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
>       at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
>       at 
> org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47)
>       at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72)
>       at 
> org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192)
>       at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
>       at java.util.concurrent.FutureTask.run(FutureTask.java)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)
>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)
>       at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
>       at java.util.concurrent.FutureTask.run(FutureTask.java)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.camel.component.file.GenericFileOperationFailedException: Cannot 
> change directory to: [my_directory]
>       at 
> org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:596)
>       at 
> org.apache.camel.component.file.remote.SftpOperations.changeCurrentDirectory(SftpOperations.java:584)
>       at 
> org.apache.camel.component.file.remote.SftpOperations.storeFile(SftpOperations.java:830)
>       at 
> org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277)
>       at 
> org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165)
>       ... 39 more
> Caused by: 4:
>       at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:359)
>       at 
> org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:594)
>       ... 43 more
> Caused by: java.io.IOException: Pipe closed
>       at java.io.PipedInputStream.read(PipedInputStream.java:307)
>       at 
> com.jcraft.jsch.Channel$MyPipedInputStream.updateReadSide(Channel.java:362)
>       at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:337)
>       ... 44 more
> 2018-02-08 15:05:26.186 DEBUG o.a.c.c.file.remote.RemoteFileProducer     : 
> Exception occurred during stopping: Cannot change directory to: [my_directory]
> {code}
> So thread #35 stopped the producer, while thread #37 was trying to use it.
> One more ugly thing about it is that when SftpOperations fail due to a closed 
> pipe, by the time we get to RemoteFileProducer#handleFailedWrite:
> {code}
> public void handleFailedWrite(...) throws Exception {
>     ...
>     if (isStopping() || isStopped()) {
>         // if we are stopping then ignore any exception during a poll
>         log.debug("Exception occurred during stopping: " + 
> exception.getMessage());
>     } else {
>         log.warn("Writing file failed with: " + exception.getMessage());
>         ...
>         throw exception;
>     }
> }
> {code}
> the producer is already stopped, *so the exception is logged on DEBUG and not 
> rethrown*.
> Note that I'm writing multiple files in parallel (three in my case), I'm 
> using this to send data to the route ending in the SFTP endpoint:
> {code}
> @Produce(uri = "direct:myDir")
> private MyDir myDir;
> ...
> myDir.sendAsync(...)
> {code}
> where
> {code}
> public interface MyDir {
>     Future<?> sendAsync(...);
> }
> {code}
> We're using Camel 2.19.0, but so far that I've looked at the github 
> repository, the issue is most likely present in the current version too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to