This is an automated email from the ASF dual-hosted git repository.
siano pushed a commit to branch camel-2.22.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.22.x by this push:
new 13c4d98 CAMEL-13168 - Delay StreamCache file deletion till calling
LUW is done
13c4d98 is described below
commit 13c4d9867f3827cff083f1aa291e0ae2c5f8736e
Author: Stephan Siano <[email protected]>
AuthorDate: Thu Feb 7 12:00:52 2019 +0100
CAMEL-13168 - Delay StreamCache file deletion till calling LUW is done
---
.../camel/component/directvm/DirectVmProcessor.java | 8 ++++++++
.../apache/camel/processor/StreamCachingInOutTest.java | 17 ++++++++++++++++-
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
index 2844f6a..167c539 100644
---
a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
+++
b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
@@ -91,6 +91,14 @@ public final class DirectVmProcessor extends
DelegateAsyncProcessor {
Exchange newExchange =
ExchangeHelper.copyExchangeAndSetCamelContext(exchange,
endpoint.getCamelContext(), false);
// set the from endpoint
newExchange.setFromEndpoint(endpoint);
+ // The StreamCache created by the child routes must not be
+ // closed by the unit of work of the child route, but by the unit of
+ // work of the parent route or grand parent route or grand grand
parent route ...(in case of nesting).
+ // Set therefore the unit of work of the parent route as stream cache
unit of work,
+ // if it is not already set.
+ if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) ==
null) {
+ newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK,
exchange.getUnitOfWork());
+ }
return newExchange;
}
diff --git
a/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
b/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
index 2514961..1284e67 100644
---
a/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
+++
b/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
@@ -38,13 +38,28 @@ public class StreamCachingInOutTest extends
ContextTestSupport {
assertEquals(c.assertExchangeReceived(0).getIn().getBody(String.class),
"James,Guillaume,Hiram,Rob,Roman");
}
+ @Test
+ public void testStreamCachingPerRouteWithDirecVM() throws Exception {
+ MockEndpoint e = getMockEndpoint("mock:e");
+ e.expectedMessageCount(1);
+
+ InputStream message = getTestFileStream();
+ template.sendBody("direct:e", message);
+
+ assertMockEndpointsSatisfied();
+
assertEquals(e.assertExchangeReceived(0).getIn().getBody(String.class),
"James,Guillaume,Hiram,Rob,Roman");
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:c").noStreamCaching().to("direct:d").to("mock:c");
+ context.getStreamCachingStrategy().setSpoolThreshold(1);
+
from("direct:c").noStreamCaching().to("direct:d").convertBodyTo(String.class).to("mock:c");
from("direct:d").streamCaching().process(new TestProcessor());
+
from("direct:e").noStreamCaching().to("direct-vm:f").convertBodyTo(String.class).to("mock:e");
+ from("direct-vm:f").streamCaching().process(new
TestProcessor());
}
};
}