Author: gertv
Date: Mon Feb  9 20:20:24 2009
New Revision: 742719

URL: http://svn.apache.org/viewvc?rev=742719&view=rev
Log:
Merged revisions 742705 via svnmerge from 
https://svn.eu.apache.org/repos/asf/camel/trunk

........
  r742705 | gertv | 2009-02-09 20:46:50 +0100 (Mon, 09 Feb 2009) | 1 line
  
  SM-1271: StreamCachingInterceptor should support async exchange handling
........

Modified:
    camel/branches/camel-1.x/   (props changed)
    
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
    
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
    
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java

Propchange: camel/branches/camel-1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb  9 20:20:24 2009
@@ -1 +1 @@
-/camel/trunk:739733,739904,740251,740295,740306,740596,740663,741848,742231
+/camel/trunk:739733,739904,740251,740295,740306,740596,740663,741848,742231,742705

Propchange: camel/branches/camel-1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java?rev=742719&r1=742718&r2=742719&view=diff
==============================================================================
--- 
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
 (original)
+++ 
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
 Mon Feb  9 20:20:24 2009
@@ -18,36 +18,25 @@
 
 import java.util.List;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.converter.stream.StreamCache;
 import org.apache.camel.model.InterceptorRef;
 import org.apache.camel.model.InterceptorType;
-import org.apache.camel.processor.Interceptor;
+import org.apache.camel.processor.DelegateProcessor;
+import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.MessageHelper;
 
 /**
- * {...@link Interceptor} that converts a message into a re-readable format
+ * {...@link DelegateProcessor} that converts a message into a re-readable 
format
  */
-public class StreamCachingInterceptor extends Interceptor {
+public class StreamCachingInterceptor extends DelegateProcessor implements 
AsyncProcessor {
 
     public StreamCachingInterceptor() {
         super();
-        setInterceptorLogic(new Processor() {
-            public void process(Exchange exchange) throws Exception {
-                try {
-                    StreamCache newBody = 
exchange.getIn().getBody(StreamCache.class);
-                    if (newBody != null) {
-                        exchange.getIn().setBody(newBody);
-                    }
-                    MessageHelper.resetStreamCache(exchange.getIn());
-                } catch (NoTypeConversionAvailableException ex) {
-                    // ignore if in is not of StreamCache type
-                }
-                proceed(exchange);
-            }
-        });
     }
 
     public StreamCachingInterceptor(Processor processor) {
@@ -74,4 +63,37 @@
             }
         }
     }
+    
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            StreamCache newBody = exchange.getIn().getBody(StreamCache.class);
+            if (newBody != null) {
+                exchange.getIn().setBody(newBody);
+            }
+            MessageHelper.resetStreamCache(exchange.getIn());
+        } catch (NoTypeConversionAvailableException ex) {
+            // ignore if in is not of StreamCache type
+        }
+        return proceed(exchange, callback);
+    } 
+
+    public boolean proceed(Exchange exchange, AsyncCallback callback) {
+        if (getProcessor() instanceof AsyncProcessor) {
+            return ((AsyncProcessor) getProcessor()).process(exchange, 
callback);
+        } else {
+            try {
+                processor.process(exchange);
+            } catch (Throwable e) {
+                exchange.setException(e);
+            }
+            // false means processing of the exchange asynchronously,
+            callback.done(true);
+            return true;
+        }
+    }
 }

Modified: 
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java?rev=742719&r1=742718&r2=742719&view=diff
==============================================================================
--- 
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
 (original)
+++ 
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
 Mon Feb  9 20:20:24 2009
@@ -252,7 +252,7 @@
             // take off the InstrumentationProcessor
             processor = unwrapDelegateProcessor(processor);
             // take off the StreamCacheInterceptor
-            processor = unwrapInterceptor(processor);
+            processor = unwrapDelegateProcessor(processor);
             MulticastProcessor multicastProcessor = 
assertIsInstanceOf(MulticastProcessor.class, processor);
             List<Processor> endpoints = new 
ArrayList<Processor>(multicastProcessor.getProcessors());
             assertEquals("Should have 2 endpoints", 2, endpoints.size());

Modified: 
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java?rev=742719&r1=742718&r2=742719&view=diff
==============================================================================
--- 
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
 (original)
+++ 
camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
 Mon Feb  9 20:20:24 2009
@@ -145,9 +145,6 @@
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                //TODO: revert this once we get DelegateProcessor to support 
async
-                setErrorHandlerBuilder(noErrorHandler());
-
                 // START SNIPPET: example
                 from("direct:a").thread(1).process(new Processor() {
                     public void process(Exchange exchange) throws Exception {


Reply via email to