[ https://issues.apache.org/jira/browse/CAMEL-12244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Krzysztof Szafrański updated CAMEL-12244: ----------------------------------------- Description: In our application we're using an SFTP producer with "fileExist=Move" and a specific "moveExisting" expression. I encountered a problem where this would sometimes work, and sometimes not (i.e. there would be no ".archived" file). Upon further investigation I found the problem and it seems to be a bug in Camel. Our SFTP endpoint looks like this: {code:none} sftp://...:.../...?username=...&privateKeyPassphrase=...&privateKeyFile=...&useUserKnownHostsFile=false&jschLoggingLevel=ERROR&fileExist=Move&moveExisting=${file:name}.archived${date:now:yyyyMMddHHmmssSSS} {code} We also have an interceptor: {code:none} route.interceptSendToEndpoint("sftp://.*").process(exchange -> LOG.info("Sending file {} to {}", ...)); {code} As I discovered, using the interceptor wraps the RemoteFileProducer with InterceptSendToEndpoint. This however changes the behavior of the ProducerCache: {code} public boolean doInAsyncProducer(...) { ... return producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, doneSync -> { ... if (producer instanceof ServicePoolAware) { // release back to the pool pool.release(endpoint, producer); } else if (!producer.isSingleton()) { // stop and shutdown non-singleton producers as we should not leak resources try { ServiceHelper.stopAndShutdownService(producer); } catch (Exception e) { ... } } ... }); ... } {code} RemoteFileProducer implements ServicePoolAware so it would normally go back to the pool, but InterceptSendToEndpoint _does not_. As a result, our producers keep getting stopped (note that RemoteFileProducer#isSingleton always returns false). What's more, somehow they _are_ being reused and in the end we run into situations, where one thread is closing a producer, while another thread is trying to write with it. I set up some breakpoints that log the thread name and System#identityHashCode of the producer: {code} 2018-02-08 15:05:25.070 TRACE o.a.c.c.file.remote.RemoteFileProducer : Starting producer: RemoteFileProducer[...] 2018-02-08 15:05:25.073 TRACE o.a.c.c.file.remote.RemoteFileProducer : Processing file: [my_file] for exchange: ... 2018-02-08 15:05:25.073 DEBUG o.a.c.c.file.remote.RemoteFileProducer : Not already connected/logged in. Connecting to: ... doStop(), time: 1518098725112, thread [Camel (camel-1) thread #35 - CamelInvocationHandler], producer: 889747012 at org.apache.camel.component.file.remote.RemoteFileProducer.doStop(RemoteFileProducer.java:175) at org.apache.camel.support.ServiceSupport.stop(ServiceSupport.java:102) at org.apache.camel.util.ServiceHelper.stopService(ServiceHelper.java:142) at org.apache.camel.impl.InterceptSendToEndpoint$1.stop(InterceptSendToEndpoint.java:196) at org.apache.camel.support.ServiceSupport.shutdown(ServiceSupport.java:164) at org.apache.camel.util.ServiceHelper.stopAndShutdownService(ServiceHelper.java:211) at org.apache.camel.impl.ProducerCache.lambda$doInAsyncProducer$2(ProducerCache.java:450) at org.apache.camel.processor.SendProcessor$2$1.done(SendProcessor.java:178) at org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:171) at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173) at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695) at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623) at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247) at org.apache.camel.processor.Splitter.process(Splitter.java:114) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47) at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72) at org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at java.util.concurrent.FutureTask.run(FutureTask.java) at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at java.util.concurrent.FutureTask.run(FutureTask.java) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer : Connected and logged in to: ... 2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer : Disconnecting from: ... 2018-02-08 15:05:25.973 TRACE o.a.c.c.file.remote.RemoteFileProducer : About to write [my_file] to [...] from exchange [...] 2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer : Stopping producer: RemoteFileProducer[...] 2018-02-08 15:05:25.974 DEBUG o.a.c.c.file.remote.RemoteFileProducer : Starting 2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer : Starting producer: RemoteFileProducer[...] 2018-02-08 15:05:25.977 TRACE o.a.c.c.file.remote.RemoteFileProducer : Processing file: [another_file] for exchange: Exchange[...] 2018-02-08 15:05:25.977 DEBUG o.a.c.c.file.remote.RemoteFileProducer : Not already connected/logged in. Connecting to: ... handleFailedWrite(), time: 1518098726072, thread [Camel (camel-1) thread #37 - CamelInvocationHandler], producer: 889747012 at org.apache.camel.component.file.remote.RemoteFileProducer.handleFailedWrite(RemoteFileProducer.java:81) at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:227) at org.apache.camel.component.file.remote.RemoteFileProducer.process(RemoteFileProducer.java:58) at org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:167) at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173) at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695) at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623) at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247) at org.apache.camel.processor.Splitter.process(Splitter.java:114) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47) at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72) at org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at java.util.concurrent.FutureTask.run(FutureTask.java) at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at java.util.concurrent.FutureTask.run(FutureTask.java) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.camel.component.file.GenericFileOperationFailedException: Cannot change directory to: [my_directory] at org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:596) at org.apache.camel.component.file.remote.SftpOperations.changeCurrentDirectory(SftpOperations.java:584) at org.apache.camel.component.file.remote.SftpOperations.storeFile(SftpOperations.java:830) at org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277) at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165) ... 39 more Caused by: 4: at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:359) at org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:594) ... 43 more Caused by: java.io.IOException: Pipe closed at java.io.PipedInputStream.read(PipedInputStream.java:307) at com.jcraft.jsch.Channel$MyPipedInputStream.updateReadSide(Channel.java:362) at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:337) ... 44 more 2018-02-08 15:05:26.186 DEBUG o.a.c.c.file.remote.RemoteFileProducer : Exception occurred during stopping: Cannot change directory to: [my_directory] {code} So thread #35 stopped the producer, while thread #37 was trying to use it. One more ugly thing about it is that when SftpOperations fail due to a closed pipe, by the time we get to RemoteFileProducer#handleFailedWrite: {code} public void handleFailedWrite(...) throws Exception { ... if (isStopping() || isStopped()) { // if we are stopping then ignore any exception during a poll log.debug("Exception occurred during stopping: " + exception.getMessage()); } else { log.warn("Writing file failed with: " + exception.getMessage()); ... throw exception; } } {code} the producer is already stopped, *so the exception is logged on DEBUG and not rethrown*. Note that I'm writing multiple files in parallel (three in my case), I'm using this to send data to the route ending in the SFTP endpoint: {code} @Produce(uri = "direct:myDir") private MyDir myDir; ... myDir.sendAsync(...) {code} where {code} public interface MyDir { Future<?> sendAsync(...); } {code} We're using Camel 2.19.0, but so far that I've looked at the github repository, the issue is most likely present in the current version too. was: In our application we're using an SFTP producer with "fileExist=Move" and a specific "moveExisting" expression. I encountered a problem where this would sometimes work, and sometimes not (i.e. there would be no ".archived" file). Upon further investigation I found the problem and it seems to be a bug in Camel. Our SFTP endpoint looks like this: {code:none} sftp://...:.../...?username=...&privateKeyPassphrase=...&privateKeyFile=...&useUserKnownHostsFile=false&jschLoggingLevel=ERROR&fileExist=Move&moveExisting=${file:name}.archived${date:now:yyyyMMddHHmmssSSS} {code} We also have an interceptor: {code:none} route.interceptSendToEndpoint("sftp://.*").process(exchange -> LOG.info("Sending file {} to {}", ...)); {code} As I discovered, using the interceptor wraps the RemoteFileProducer with InterceptSendToEndpoint. This however changes the behavior of the ProducerCache: {code} public boolean doInAsyncProducer(...) { ... return producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, doneSync -> { ... if (producer instanceof ServicePoolAware) { // release back to the pool pool.release(endpoint, producer); } else if (!producer.isSingleton()) { // stop and shutdown non-singleton producers as we should not leak resources try { ServiceHelper.stopAndShutdownService(producer); } catch (Exception e) { ... } } ... }); ... } {code} RemoteFileProducer implements ServicePoolAware so it would normally go back to the pool, but InterceptSendToEndpoint _does not_. As a result, our producers keep getting stopped (note that RemoteFileProducer#isSingleton always returns false). What's more, somehow they _are_ being reused and in the end we run into situations, where one thread is closing a producer, while another thread is trying to write with it. I set up some breakpoints that log the thread name and System#identityHashCode of the producer: {code} 2018-02-08 15:05:25.070 TRACE o.a.c.c.file.remote.RemoteFileProducer : Starting producer: RemoteFileProducer[...] 2018-02-08 15:05:25.073 TRACE o.a.c.c.file.remote.RemoteFileProducer : Processing file: [my_file] for exchange: ... 2018-02-08 15:05:25.073 DEBUG o.a.c.c.file.remote.RemoteFileProducer : Not already connected/logged in. Connecting to: ... doStop(), time: 1518098725112, thread [Camel (camel-1) thread #35 - CamelInvocationHandler], producer: 889747012 at org.apache.camel.component.file.remote.RemoteFileProducer.doStop(RemoteFileProducer.java:175) at org.apache.camel.support.ServiceSupport.stop(ServiceSupport.java:102) at org.apache.camel.util.ServiceHelper.stopService(ServiceHelper.java:142) at org.apache.camel.impl.InterceptSendToEndpoint$1.stop(InterceptSendToEndpoint.java:196) at org.apache.camel.support.ServiceSupport.shutdown(ServiceSupport.java:164) at org.apache.camel.util.ServiceHelper.stopAndShutdownService(ServiceHelper.java:211) at org.apache.camel.impl.ProducerCache.lambda$doInAsyncProducer$2(ProducerCache.java:450) at org.apache.camel.processor.SendProcessor$2$1.done(SendProcessor.java:178) at org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:171) at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173) at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695) at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623) at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247) at org.apache.camel.processor.Splitter.process(Splitter.java:114) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47) at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72) at org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at java.util.concurrent.FutureTask.run(FutureTask.java) at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at java.util.concurrent.FutureTask.run(FutureTask.java) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer : Connected and logged in to: ... 2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer : Disconnecting from: ... 2018-02-08 15:05:25.973 TRACE o.a.c.c.file.remote.RemoteFileProducer : About to write [my_file] to [...] from exchange [...] 2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer : Stopping producer: RemoteFileProducer[...] 2018-02-08 15:05:25.974 DEBUG o.a.c.c.file.remote.RemoteFileProducer : Starting 2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer : Starting producer: RemoteFileProducer[...] 2018-02-08 15:05:25.977 TRACE o.a.c.c.file.remote.RemoteFileProducer : Processing file: [another_file] for exchange: Exchange[...] 2018-02-08 15:05:25.977 DEBUG o.a.c.c.file.remote.RemoteFileProducer : Not already connected/logged in. Connecting to: ... handleFailedWrite(), time: 1518098726072, thread [Camel (camel-1) thread #37 - CamelInvocationHandler], producer: 889747012 at org.apache.camel.component.file.remote.RemoteFileProducer.handleFailedWrite(RemoteFileProducer.java:81) at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:227) at org.apache.camel.component.file.remote.RemoteFileProducer.process(RemoteFileProducer.java:58) at org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:167) at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173) at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695) at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623) at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247) at org.apache.camel.processor.Splitter.process(Splitter.java:114) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47) at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72) at org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at java.util.concurrent.FutureTask.run(FutureTask.java) at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at java.util.concurrent.FutureTask.run(FutureTask.java) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.camel.component.file.GenericFileOperationFailedException: Cannot change directory to: [my_directory] at org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:596) at org.apache.camel.component.file.remote.SftpOperations.changeCurrentDirectory(SftpOperations.java:584) at org.apache.camel.component.file.remote.SftpOperations.storeFile(SftpOperations.java:830) at org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277) at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165) ... 39 more Caused by: 4: at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:359) at org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:594) ... 43 more Caused by: java.io.IOException: Pipe closed at java.io.PipedInputStream.read(PipedInputStream.java:307) at com.jcraft.jsch.Channel$MyPipedInputStream.updateReadSide(Channel.java:362) at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:337) ... 44 more 2018-02-08 15:05:26.186 DEBUG o.a.c.c.file.remote.RemoteFileProducer : Exception occurred during stopping: Cannot change directory to: [my_directory] {code} So thread #35 stopped the producer, while thread #37 was trying to use it. One more ugly thing about it, is that when SftpOperations fail due to a closed pipe, by the time it gets to RemoteFileProducer#handleFailedWrite: {code} public void handleFailedWrite(...) throws Exception { ... if (isStopping() || isStopped()) { // if we are stopping then ignore any exception during a poll log.debug("Exception occurred during stopping: " + exception.getMessage()); } else { log.warn("Writing file failed with: " + exception.getMessage()); ... throw exception; } } {code} the producer is already stopped, *so the exception is logged on DEBUG and not rethrown*. Note that I'm writing multiple files in parallel (three in this case), I'm using this to send data to the route ending in the SFTP endpoint: {code} @Produce(uri = "direct:myDir") private MyDir myDir; ... myDir.sendAsync(...) {code} where {code} public interface MyDir { Future<?> sendAsync(...); } {code} We're using Camel 2.19.0, but so far that I've looked at the github repository, the issue is most likely present in the current version too. > RemoteFileProducer stopped instead of being released to the pool when > "interceptSendToEndpoint" is used > ------------------------------------------------------------------------------------------------------- > > Key: CAMEL-12244 > URL: https://issues.apache.org/jira/browse/CAMEL-12244 > Project: Camel > Issue Type: Bug > Affects Versions: 2.19.0 > Reporter: Krzysztof Szafrański > Priority: Major > > In our application we're using an SFTP producer with "fileExist=Move" and a > specific "moveExisting" expression. I encountered a problem where this would > sometimes work, and sometimes not (i.e. there would be no ".archived" file). > Upon further investigation I found the problem and it seems to be a bug in > Camel. > Our SFTP endpoint looks like this: > {code:none} > sftp://...:.../...?username=...&privateKeyPassphrase=...&privateKeyFile=...&useUserKnownHostsFile=false&jschLoggingLevel=ERROR&fileExist=Move&moveExisting=${file:name}.archived${date:now:yyyyMMddHHmmssSSS} > {code} > We also have an interceptor: > {code:none} > route.interceptSendToEndpoint("sftp://.*").process(exchange -> > LOG.info("Sending file {} to {}", ...)); > {code} > As I discovered, using the interceptor wraps the RemoteFileProducer with > InterceptSendToEndpoint. This however changes the behavior of the > ProducerCache: > {code} > public boolean doInAsyncProducer(...) { > ... > return producerCallback.doInAsyncProducer(producer, asyncProcessor, > exchange, pattern, doneSync -> { > ... > if (producer instanceof ServicePoolAware) { > // release back to the pool > pool.release(endpoint, producer); > } else if (!producer.isSingleton()) { > // stop and shutdown non-singleton producers as we should not > leak resources > try { > ServiceHelper.stopAndShutdownService(producer); > } catch (Exception e) { > ... > } > } > ... > }); > ... > } > {code} > RemoteFileProducer implements ServicePoolAware so it would normally go back > to the pool, but InterceptSendToEndpoint _does not_. As a result, our > producers keep getting stopped (note that RemoteFileProducer#isSingleton > always returns false). > What's more, somehow they _are_ being reused and in the end we run into > situations, where one thread is closing a producer, while another thread is > trying to write with it. > I set up some breakpoints that log the thread name and > System#identityHashCode of the producer: > {code} > 2018-02-08 15:05:25.070 TRACE o.a.c.c.file.remote.RemoteFileProducer : > Starting producer: RemoteFileProducer[...] > 2018-02-08 15:05:25.073 TRACE o.a.c.c.file.remote.RemoteFileProducer : > Processing file: [my_file] for exchange: ... > 2018-02-08 15:05:25.073 DEBUG o.a.c.c.file.remote.RemoteFileProducer : > Not already connected/logged in. Connecting to: ... > doStop(), time: 1518098725112, thread [Camel (camel-1) thread #35 - > CamelInvocationHandler], producer: 889747012 > at > org.apache.camel.component.file.remote.RemoteFileProducer.doStop(RemoteFileProducer.java:175) > at org.apache.camel.support.ServiceSupport.stop(ServiceSupport.java:102) > at > org.apache.camel.util.ServiceHelper.stopService(ServiceHelper.java:142) > at > org.apache.camel.impl.InterceptSendToEndpoint$1.stop(InterceptSendToEndpoint.java:196) > at > org.apache.camel.support.ServiceSupport.shutdown(ServiceSupport.java:164) > at > org.apache.camel.util.ServiceHelper.stopAndShutdownService(ServiceHelper.java:211) > at > org.apache.camel.impl.ProducerCache.lambda$doInAsyncProducer$2(ProducerCache.java:450) > at > org.apache.camel.processor.SendProcessor$2$1.done(SendProcessor.java:178) > at > org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:171) > at > org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173) > at > org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436) > at > org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168) > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110) > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) > at > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695) > at > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623) > at > org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247) > at org.apache.camel.processor.Splitter.process(Splitter.java:114) > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110) > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) > at > org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47) > at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72) > at > org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192) > at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) > at java.util.concurrent.FutureTask.run(FutureTask.java) > at > java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java) > at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) > at java.util.concurrent.FutureTask.run(FutureTask.java) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > 2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer : > Connected and logged in to: ... > 2018-02-08 15:05:25.966 DEBUG o.a.c.c.file.remote.RemoteFileProducer : > Disconnecting from: ... > 2018-02-08 15:05:25.973 TRACE o.a.c.c.file.remote.RemoteFileProducer : > About to write [my_file] to [...] from exchange [...] > 2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer : > Stopping producer: RemoteFileProducer[...] > 2018-02-08 15:05:25.974 DEBUG o.a.c.c.file.remote.RemoteFileProducer : > Starting > 2018-02-08 15:05:25.974 TRACE o.a.c.c.file.remote.RemoteFileProducer : > Starting producer: RemoteFileProducer[...] > 2018-02-08 15:05:25.977 TRACE o.a.c.c.file.remote.RemoteFileProducer : > Processing file: [another_file] for exchange: Exchange[...] > 2018-02-08 15:05:25.977 DEBUG o.a.c.c.file.remote.RemoteFileProducer : > Not already connected/logged in. Connecting to: ... > handleFailedWrite(), time: 1518098726072, thread [Camel (camel-1) thread #37 > - CamelInvocationHandler], producer: 889747012 > at > org.apache.camel.component.file.remote.RemoteFileProducer.handleFailedWrite(RemoteFileProducer.java:81) > at > org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:227) > at > org.apache.camel.component.file.remote.RemoteFileProducer.process(RemoteFileProducer.java:58) > at > org.apache.camel.impl.InterceptSendToEndpoint$1.process(InterceptSendToEndpoint.java:167) > at > org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:173) > at > org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:436) > at > org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168) > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110) > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) > at > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:695) > at > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:623) > at > org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:247) > at org.apache.camel.processor.Splitter.process(Splitter.java:114) > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) > at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110) > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) > at > org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47) > at org.apache.camel.impl.DeferProducer.process(DeferProducer.java:72) > at > org.apache.camel.component.bean.AbstractCamelInvocationHandler$1.call(AbstractCamelInvocationHandler.java:192) > at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) > at java.util.concurrent.FutureTask.run(FutureTask.java) > at > java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java) > at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) > at java.util.concurrent.FutureTask.run(FutureTask.java) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.camel.component.file.GenericFileOperationFailedException: Cannot > change directory to: [my_directory] > at > org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:596) > at > org.apache.camel.component.file.remote.SftpOperations.changeCurrentDirectory(SftpOperations.java:584) > at > org.apache.camel.component.file.remote.SftpOperations.storeFile(SftpOperations.java:830) > at > org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277) > at > org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165) > ... 39 more > Caused by: 4: > at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:359) > at > org.apache.camel.component.file.remote.SftpOperations.doChangeDirectory(SftpOperations.java:594) > ... 43 more > Caused by: java.io.IOException: Pipe closed > at java.io.PipedInputStream.read(PipedInputStream.java:307) > at > com.jcraft.jsch.Channel$MyPipedInputStream.updateReadSide(Channel.java:362) > at com.jcraft.jsch.ChannelSftp.cd(ChannelSftp.java:337) > ... 44 more > 2018-02-08 15:05:26.186 DEBUG o.a.c.c.file.remote.RemoteFileProducer : > Exception occurred during stopping: Cannot change directory to: [my_directory] > {code} > So thread #35 stopped the producer, while thread #37 was trying to use it. > One more ugly thing about it is that when SftpOperations fail due to a closed > pipe, by the time we get to RemoteFileProducer#handleFailedWrite: > {code} > public void handleFailedWrite(...) throws Exception { > ... > if (isStopping() || isStopped()) { > // if we are stopping then ignore any exception during a poll > log.debug("Exception occurred during stopping: " + > exception.getMessage()); > } else { > log.warn("Writing file failed with: " + exception.getMessage()); > ... > throw exception; > } > } > {code} > the producer is already stopped, *so the exception is logged on DEBUG and not > rethrown*. > Note that I'm writing multiple files in parallel (three in my case), I'm > using this to send data to the route ending in the SFTP endpoint: > {code} > @Produce(uri = "direct:myDir") > private MyDir myDir; > ... > myDir.sendAsync(...) > {code} > where > {code} > public interface MyDir { > Future<?> sendAsync(...); > } > {code} > We're using Camel 2.19.0, but so far that I've looked at the github > repository, the issue is most likely present in the current version too. -- This message was sent by Atlassian JIRA (v7.6.3#76005)