Author: davsclaus
Date: Tue Oct 20 06:06:58 2009
New Revision: 826967

URL: http://svn.apache.org/viewvc?rev=826967&view=rev
Log:
CAMEL-2079: Camel adds exchange property with last send to endpoint and the 
failed endpoint when moving to dlc queue

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ToEndpointPropertyTest.java
   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TracerTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=826967&r1=826966&r2=826967&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Tue Oct 
20 06:06:58 2009
@@ -58,6 +58,7 @@
     String EXCEPTION_CAUGHT     = "CamelExceptionCaught";
     String ERRORHANDLER_HANDLED = "CamelErrorHandlerHandled";
     String FAILURE_HANDLED      = "CamelFailureHandled";
+    String FAILURE_ENDPOINT     = "CamelFaulureEndpoint";
 
     String FILE_LOCAL_WORK_PATH = "CamelFileLocalWorkPath";
     String FILE_NAME            = "CamelFileName";
@@ -79,6 +80,7 @@
     String HTTP_URL                = "CamelHttpUrl";
 
     String INTERCEPTED_ENDPOINT = "CamelInterceptedEndpoint";
+    String TO_ENDPOINT          = "CamelToEndpoint";
 
     String LOG_DEBUG_BODY_MAX_CHARS = "CamelLogDebugBodyMaxChars";
     String LOG_DEBUG_BODY_STREAMS   = "CamelLogDebugStreams";

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=826967&r1=826966&r2=826967&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java 
Tue Oct 20 06:06:58 2009
@@ -172,6 +172,10 @@
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(">>>> " + endpoint + " " + exchange);
                 }
+
+                // set property which endpoint we send to
+                exchange.setProperty(Exchange.TO_ENDPOINT, 
endpoint.getEndpointUri());
+
                 producer.process(exchange);
                 return exchange;
             }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=826967&r1=826966&r2=826967&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 Tue Oct 20 06:06:58 2009
@@ -33,6 +33,7 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
+import org.apache.camel.Producer;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.util.ExchangeHelper;
@@ -140,7 +141,7 @@
         final AtomicBoolean running = new AtomicBoolean(true);
 
         if (streaming) {
-            // execute tasks in paralle+streaming and aggregate in the order 
they are finished (out of order sequence)
+            // execute tasks in parallel+streaming and aggregate in the order 
they are finished (out of order sequence)
             completion = new 
ExecutorCompletionService<Exchange>(executorService);
         } else {
             // execute tasks in parallel and aggregate in the order the tasks 
are submitted (in order sequence)
@@ -162,6 +163,8 @@
                     }
 
                     try {
+                        // set property which endpoint we send to
+                        setToEndpoint(subExchange, producer);
                         producer.process(subExchange);
                     } catch (Exception e) {
                         subExchange.setException(e);
@@ -207,12 +210,14 @@
 
             // process it sequentially
             try {
+                // set property which endpoint we send to
+                setToEndpoint(subExchange, producer);
                 producer.process(subExchange);
             } catch (Exception e) {
                 subExchange.setException(e);
             }
 
-            // should we stop in case of an exception occured during 
processing?
+            // should we stop in case of an exception occurred during 
processing?
             if (stopOnException && subExchange.getException() != null) {
                 throw new CamelExchangeException("Sequential processing failed 
for number " + total, subExchange, subExchange.getException());
             }
@@ -280,6 +285,13 @@
         ServiceHelper.startServices(processors);
     }
     
+    private static void setToEndpoint(Exchange exchange, Processor processor) {
+        if (processor instanceof Producer) {
+            Producer producer = (Producer) processor;
+            exchange.setProperty(Exchange.TO_ENDPOINT, 
producer.getEndpoint().getEndpointUri());
+        }
+    }
+
     /**
      * Is the multicast processor working in streaming mode?
      * 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=826967&r1=826966&r2=826967&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
 Tue Oct 20 06:06:58 2009
@@ -304,7 +304,7 @@
             if (processor != null) {
                 data.failureProcessor = processor;
             }
-            // route specific on redelivey?
+            // route specific on redelivery?
             processor = exceptionPolicy.getOnRedelivery();
             if (processor != null) {
                 data.onRedeliveryProcessor = processor;
@@ -377,6 +377,8 @@
                 log.trace("Failure processor " + processor + " is processing 
Exchange: " + exchange);
             }
             try {
+                // store the last to endpoint as the failure endpoint
+                exchange.setProperty(Exchange.FAILURE_ENDPOINT, 
exchange.getProperty(Exchange.TO_ENDPOINT));
                 processor.process(exchange);
             } catch (Exception e) {
                 exchange.setException(e);
@@ -406,7 +408,9 @@
             } else {
                 // exception not handled, put exception back in the exchange
                 
exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, 
Exception.class));
-            }
+                // and put failure endpoint back as well
+                exchange.setProperty(Exchange.FAILURE_ENDPOINT, 
exchange.getProperty(Exchange.TO_ENDPOINT));
+             }
             return;
         }
 
@@ -418,6 +422,8 @@
             // exception not handled, put exception back in the exchange
             exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
             
exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, 
Exception.class));
+            // and put failure endpoint back as well
+            exchange.setProperty(Exchange.FAILURE_ENDPOINT, 
exchange.getProperty(Exchange.TO_ENDPOINT));
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("This exchange is handled so its marked as not 
failed: " + exchange);

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java?rev=826967&r1=826966&r2=826967&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
 Tue Oct 20 06:06:58 2009
@@ -76,6 +76,8 @@
 
             getProducerCache(exchange).doInProducer(endpoint, copy, null, new 
ProducerCallback<Object>() {
                 public Object doInProducer(Producer producer, Exchange 
exchange, ExchangePattern exchangePattern) throws Exception {
+                    // set property which endpoint we send to
+                    exchange.setProperty(Exchange.TO_ENDPOINT, 
producer.getEndpoint().getEndpointUri());
                     producer.process(exchange);
                     return exchange;
                 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?rev=826967&r1=826966&r2=826967&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
 Tue Oct 20 06:06:58 2009
@@ -110,6 +110,8 @@
         if (pattern != null) {
             exchange.setPattern(pattern);
         }
+        // set property which endpoint we send to
+        exchange.setProperty(Exchange.TO_ENDPOINT, 
destination.getEndpointUri());
         return exchange;
     }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=826967&r1=826966&r2=826967&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
 Tue Oct 20 06:06:58 2009
@@ -106,13 +106,17 @@
 
     @Override
     protected Exchange configureExchange(Exchange exchange, ExchangePattern 
pattern) {
+        Exchange answer;
         if (newExchangeProcessor == null && newExchangeExpression == null) {
             // use a copy of the original exchange
-            return configureCopyExchange(exchange);
+            answer = configureCopyExchange(exchange);
         } else {
             // use a new exchange
-            return configureNewExchange(exchange);
+            answer = configureNewExchange(exchange);
         }
+        // set property which endpoint we send to
+        answer.setProperty(Exchange.TO_ENDPOINT, destination.getEndpointUri());
+        return answer;
     }
 
     private Exchange configureCopyExchange(Exchange exchange) {

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ToEndpointPropertyTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ToEndpointPropertyTest.java?rev=826967&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ToEndpointPropertyTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ToEndpointPropertyTest.java
 Tue Oct 20 06:06:58 2009
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class ToEndpointPropertyTest extends ContextTestSupport {
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testSimpleToEndpoint() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("mock:result");
+            }
+        });
+        context.start();
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        
mock.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://result");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testMediumToEndpoint() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("direct:foo");
+
+                from("direct:foo").to("mock:result");
+
+            }
+        });
+        context.start();
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        
mock.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://result");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testRecipientListToEndpoint() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").recipientList(header("foo"));
+            }
+        });
+        context.start();
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        
mock.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://result");
+
+        template.sendBodyAndHeader("direct:start", "Hello World", "foo", 
"mock:result");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testRoutingSlipToEndpoint() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").routingSlip("foo");
+            }
+        });
+        context.start();
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        
mock.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://result");
+
+        MockEndpoint a = getMockEndpoint("mock:a");
+        a.expectedMessageCount(1);
+        a.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://a");
+
+        MockEndpoint b = getMockEndpoint("mock:b");
+        b.expectedMessageCount(1);
+        b.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://b");
+
+        template.sendBodyAndHeader("direct:start", "Hello World", "foo", 
"mock:a,mock:b,mock:result");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testWireTapToEndpoint() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").wireTap("mock:tap").to("mock:result");
+            }
+        });
+        context.start();
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        
mock.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://result");
+
+        MockEndpoint tap = getMockEndpoint("mock:tap");
+        tap.expectedMessageCount(1);
+        tap.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://tap");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testMulticastToEndpoint() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").multicast().to("direct:a", 
"direct:b").end().process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        String to = exchange.getProperty(Exchange.TO_ENDPOINT, 
String.class);
+                        assertEquals("direct://b", to);
+                    }
+                }).to("mock:result");
+
+                from("direct:a").transform(constant("A"));
+                from("direct:b").transform(constant("B"));
+            }
+        });
+        context.start();
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+        result.message(0).property(Exchange.FAILURE_ENDPOINT).isNull();
+        
result.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://result");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testDLCToEndpoint() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
errorHandler(deadLetterChannel("mock:dead").disableRedelivery());
+
+                from("direct:start").to("direct:foo").to("mock:result");
+
+                from("direct:foo").throwException(new 
IllegalArgumentException("Damn"));
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        MockEndpoint dead = getMockEndpoint("mock:dead");
+        
dead.message(0).property(Exchange.FAILURE_ENDPOINT).isEqualTo("direct://foo");
+        
dead.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://dead");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testMediumDLCToEndpoint() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
errorHandler(deadLetterChannel("direct:dead").disableRedelivery());
+
+                from("direct:start").to("direct:foo").to("mock:result");
+
+                from("direct:foo").throwException(new 
IllegalArgumentException("Damn"));
+
+                from("direct:dead").to("mock:a").to("mock:b").to("mock:dead");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        MockEndpoint dead = getMockEndpoint("mock:dead");
+        
dead.message(0).property(Exchange.FAILURE_ENDPOINT).isEqualTo("direct://foo");
+        
dead.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://dead");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testMulticastDLC() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
errorHandler(deadLetterChannel("mock:dead").disableRedelivery());
+
+                from("direct:start").multicast().to("direct:a", "direct:b");
+
+                from("direct:a").transform(constant("A"));
+                from("direct:b").throwException(new 
IllegalArgumentException("Damn"));
+
+                from("direct:dead").to("mock:dead");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        MockEndpoint dead = getMockEndpoint("mock:dead");
+        
dead.message(0).property(Exchange.FAILURE_ENDPOINT).isEqualTo("direct://b");
+        
dead.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://dead");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+
+
+}

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ToEndpointPropertyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ToEndpointPropertyTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TracerTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TracerTest.java?rev=826967&r1=826966&r2=826967&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TracerTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TracerTest.java
 Tue Oct 20 06:06:58 2009
@@ -84,7 +84,7 @@
         assertNotNull(em.getExchangePattern());
         assertEquals("direct://start", em.getFromEndpointUri());
         assertNull(em.getHeaders());
-        assertNull(em.getProperties());
+        assertNotNull(em.getProperties());
         assertNull(em.getOutBody());
         assertNull(em.getOutBodyType());
         assertNull(em.getOutHeaders());


Reply via email to