This is an automated email from the ASF dual-hosted git repository.

siano pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 050db6a  CAMEL-13168 - Delay StreamCache file deletion till calling 
LUW is done
050db6a is described below

commit 050db6ab68f4e167a85c2af77f7f7a33bc0c2521
Author: Stephan Siano <[email protected]>
AuthorDate: Thu Feb 7 12:00:52 2019 +0100

    CAMEL-13168 - Delay StreamCache file deletion till calling LUW is done
---
 .../camel/component/directvm/DirectVmProcessor.java     |  8 ++++++++
 .../apache/camel/processor/StreamCachingInOutTest.java  | 17 ++++++++++++++++-
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
 
b/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
index 964f87f..6cc8c5a 100644
--- 
a/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
+++ 
b/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
@@ -88,6 +88,14 @@ public final class DirectVmProcessor extends 
DelegateAsyncProcessor {
         Exchange newExchange = 
ExchangeHelper.copyExchangeAndSetCamelContext(exchange, 
endpoint.getCamelContext(), false);
         // set the from endpoint
         newExchange.setFromEndpoint(endpoint);
+        // The StreamCache created by the child routes must not be 
+        // closed by the unit of work of the child route, but by the unit of 
+        // work of the parent route or grand parent route or grand grand 
parent route ...(in case of nesting).
+        // Set therefore the unit of work of the  parent route as stream cache 
unit of work, 
+        // if it is not already set.
+        if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == 
null) {
+            newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, 
exchange.getUnitOfWork());
+        }
         return newExchange;
     }
 
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
index 36adc4d..78caf8a 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingInOutTest.java
@@ -40,13 +40,28 @@ public class StreamCachingInOutTest extends 
ContextTestSupport {
         
assertEquals(c.assertExchangeReceived(0).getIn().getBody(String.class), 
"James,Guillaume,Hiram,Rob,Roman");
     }
 
+    @Test
+    public void testStreamCachingPerRouteWithDirecVM() throws Exception {
+        MockEndpoint e = getMockEndpoint("mock:e");
+        e.expectedMessageCount(1);
+
+        InputStream message = getTestFileStream();
+        template.sendBody("direct:e", message);
+
+        assertMockEndpointsSatisfied();
+        
assertEquals(e.assertExchangeReceived(0).getIn().getBody(String.class), 
"James,Guillaume,Hiram,Rob,Roman");
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:c").noStreamCaching().to("direct:d").to("mock:c");
+                context.getStreamCachingStrategy().setSpoolThreshold(1);
+                
from("direct:c").noStreamCaching().to("direct:d").convertBodyTo(String.class).to("mock:c");
                 from("direct:d").streamCaching().process(new TestProcessor());
+                
from("direct:e").noStreamCaching().to("direct-vm:f").convertBodyTo(String.class).to("mock:e");
+                from("direct-vm:f").streamCaching().process(new 
TestProcessor());
             }
         };
     }

Reply via email to