Hi, Mybeans.xml <http://camel.465427.n5.nabble.com/file/n5756091/Mybeans.xml>
*Issue:* Whenever data is spooled in file via CachedOutputStream in any camel component in a multicast branch, that data becomes unreadable in a) Aggregation Strategy of Multicast b) After multicast, in case there is no aggregation strategy We are getting: a) FileNotFound issues as the file is deleted on completion of the cloned branch exchange. b) Premature end of file, when we read data from InputStream and use XMLReader / STAX to read the data. If we use the Constructor: new CachedOutputStream(exchange, false), the streamcache file will not be deleted. But the file may never be cleaned up & we do not want to do that. *Details* We are using camel 2.13.2 - I have a multicast route with an AggregationStrategy. And in each multicast branch, we have a custom camel component that returns huge data (around 4 MB) and writes to StreamCache (CachedOutputStream) and we need to aggregate the data in the multicast (AggregationStrrategy). In the Aggregation strategy, I need to do XPath evaluation using camel XPathBuilder. Hence, I try to read the body and convert from StreamCache to byte[] to avoid 'Error during type conversion from type: org.apache.camel.converter.stream.InputStreamCache.' in the XPathBuilder. When I try to read the body in the beginning of the AggregationStrategy, I get the following error. /tmp/camel/camel-tmp-4e00bf8a-4a42-463a-b046-5ea2d7fc8161/cos6047774870387520936.tmp (No such file or directory), cause: FileNotFoundException:/tmp/camel/camel-tmp-4e00bf8a-4a42-463a-b046-5ea2d7fc8161/cos6047774870387520936.tmp (No such file or directory). at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.<init>(FileInputStream.java:138) at org.apache.camel.converter.stream.FileInputStreamCache.createInputStream(FileInputStreamCache.java:123) at org.apache.camel.converter.stream.FileInputStreamCache.getInputStream(FileInputStreamCache.java:117) at org.apache.camel.converter.stream.FileInputStreamCache.writeTo(FileInputStreamCache.java:93) at org.apache.camel.converter.stream.StreamCacheConverter.convertToByteArray(StreamCacheConverter.java:102) at com.sap.it.rt.camel.aggregate.strategies.MergeAtXPathAggregationStrategy.convertToByteArray(MergeAtXPathAggregationStrategy.java:169) at com.sap.it.rt.camel.aggregate.strategies.MergeAtXPathAggregationStrategy.convertToXpathCompatibleType(MergeAtXPathAggregationStrategy.java:161) Following is the line of code where it is throwing an error: Object body = exchange.getIn().getBody(); if( body instanceof StreamCache){ StreamCache cache = (StreamCache)body; xml = new String(convertToByteArray(cache,exchange)); exchange.getIn().setBody(xml); } <http://camel.465427.n5.nabble.com/file/n5756091/StreamCache_File_Gets_Deleted_before_Aggregation.png> By disabling stream cache to write to file by setting a threshold of 10MB in multicast related routes, we were able to work with the aggregation strategy. But we do not want to do that, as we may have incoming data that maybe bigger. <camel:camelContext id="multicast_xml_1" streamCache="true"> <camel:properties> <camel:property key="CamelCachedOutputStreamCipherTransformation" value="RC4"/> <camel:property key="CamelCachedOutputStreamThreshold" value="100000000"/> </camel:properties> Note: The FileNotFound issue does not appear if we have the StreamCache based camel component in the route with other processors, but without Multicast + Aggregation . After debugging, I could understand the issue with aggregating huge data from StreamCache with MulticastProcessor. In MulticastProcessor.java: doProcessParallel() is called and on completion of the branch exchange of multicast, the CachedOutputStream deletes / cleans up the temporary file. This happens even before the multicast branch exchange reaches the aggregation Strategy, which tries to read the data from the branch exchange. In case of huge data in StreamCache, the temporary file is already deleted, leading to FileNotFound issues. public CachedOutputStream(Exchange exchange, boolean closedOnCompletion) { this.strategy = exchange.getContext().getStreamCachingStrategy(); currentStream = new CachedByteArrayOutputStream(strategy.getBufferSize()); if (closedOnCompletion) { // add on completion so we can cleanup after the exchange is done such as deleting temporary files exchange.addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { try { if (fileInputStreamCache != null) { fileInputStreamCache.close(); } close(); } catch (Exception e) { LOG.warn("Error deleting temporary cache file: " + tempFile, e); } } @Override public String toString() { return "OnCompletion[CachedOutputStream]"; } }); } } public void close() throws IOException { currentStream.close(); cleanUpTempFile(); } I was able to circumvent the issue, if I try to set closedOnCompletion= false, while writing to CachedOutputStream in any component in any Multicast branch. But this is a leaky solution, because the streamcache temporary file(s) may then never get cleaned up. Can the MulticastProcessor be adjusted so that the multicast branch exchanges reach 'completion' status only, after they have been aggregated at the end of multicast? Please help / advise on the issue, as I am new to using camel Multicast. -- View this message in context: http://camel.465427.n5.nabble.com/Streamcache-issue-deletion-of-spooled-files-in-multicast-sub-exchanges-before-aggregationstrategy-tp5756091.html Sent from the Camel Development mailing list archive at Nabble.com.