This is an automated email from the ASF dual-hosted git repository.
siano pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 050db6a CAMEL-13168 - Delay StreamCache file deletion till calling
LUW is done
050db6a is described below
commit 050db6ab68f4e167a85c2af77f7f7a33bc0c2521
Author: Stephan Siano <[email protected]>
AuthorDate: Thu Feb 7 12:00:52 2019 +0100
CAMEL-13168 - Delay StreamCache file deletion till calling LUW is done
---
.../camel/component/directvm/DirectVmProcessor.java | 8 ++++++++
.../apache/camel/processor/StreamCachingInOutTest.java | 17 ++++++++++++++++-
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
b/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
index 964f87f..6cc8c5a 100644
---
a/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
+++
b/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
@@ -88,6 +88,14 @@ public final class DirectVmProcessor extends
DelegateAsyncProcessor {
Exchange newExchange =
ExchangeHelper.copyExchangeAndSetCamelContext(exchange,
endpoint.getCamelContext(), false);
// set the from endpoint
newExchange.setFromEndpoint(endpoint);
+ // The StreamCache created by the child routes must not be
+ // closed by the unit of work of the child route, but by the unit of
+ // work of the parent route or grand parent route or grand grand
parent route ...(in case of nesting).
+ // Set therefore the unit of work of the parent route as stream cache
unit of work,
+ // if it is not already set.
+ if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) ==
null) {
+ newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK,
exchange.getUnitOfWork());
+ }
return newExchange;
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
index 36adc4d..78caf8a 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
@@ -40,13 +40,28 @@ public class StreamCachingInOutTest extends
ContextTestSupport {
assertEquals(c.assertExchangeReceived(0).getIn().getBody(String.class),
"James,Guillaume,Hiram,Rob,Roman");
}
+ @Test
+ public void testStreamCachingPerRouteWithDirecVM() throws Exception {
+ MockEndpoint e = getMockEndpoint("mock:e");
+ e.expectedMessageCount(1);
+
+ InputStream message = getTestFileStream();
+ template.sendBody("direct:e", message);
+
+ assertMockEndpointsSatisfied();
+
assertEquals(e.assertExchangeReceived(0).getIn().getBody(String.class),
"James,Guillaume,Hiram,Rob,Roman");
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:c").noStreamCaching().to("direct:d").to("mock:c");
+ context.getStreamCachingStrategy().setSpoolThreshold(1);
+
from("direct:c").noStreamCaching().to("direct:d").convertBodyTo(String.class).to("mock:c");
from("direct:d").streamCaching().process(new TestProcessor());
+
from("direct:e").noStreamCaching().to("direct-vm:f").convertBodyTo(String.class).to("mock:e");
+ from("direct-vm:f").streamCaching().process(new
TestProcessor());
}
};
}