Author: davsclaus
Date: Tue Oct 20 06:06:58 2009
New Revision: 826967
URL: http://svn.apache.org/viewvc?rev=826967&view=rev
Log:
CAMEL-2079: Camel adds exchange property with last send to endpoint and the
failed endpoint when moving to dlc queue
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ToEndpointPropertyTest.java
(with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TracerTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=826967&r1=826966&r2=826967&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Tue Oct
20 06:06:58 2009
@@ -58,6 +58,7 @@
String EXCEPTION_CAUGHT = "CamelExceptionCaught";
String ERRORHANDLER_HANDLED = "CamelErrorHandlerHandled";
String FAILURE_HANDLED = "CamelFailureHandled";
+ String FAILURE_ENDPOINT = "CamelFaulureEndpoint";
String FILE_LOCAL_WORK_PATH = "CamelFileLocalWorkPath";
String FILE_NAME = "CamelFileName";
@@ -79,6 +80,7 @@
String HTTP_URL = "CamelHttpUrl";
String INTERCEPTED_ENDPOINT = "CamelInterceptedEndpoint";
+ String TO_ENDPOINT = "CamelToEndpoint";
String LOG_DEBUG_BODY_MAX_CHARS = "CamelLogDebugBodyMaxChars";
String LOG_DEBUG_BODY_STREAMS = "CamelLogDebugStreams";
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=826967&r1=826966&r2=826967&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
Tue Oct 20 06:06:58 2009
@@ -172,6 +172,10 @@
if (LOG.isDebugEnabled()) {
LOG.debug(">>>> " + endpoint + " " + exchange);
}
+
+ // set property which endpoint we send to
+ exchange.setProperty(Exchange.TO_ENDPOINT,
endpoint.getEndpointUri());
+
producer.process(exchange);
return exchange;
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=826967&r1=826966&r2=826967&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Tue Oct 20 06:06:58 2009
@@ -33,6 +33,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
+import org.apache.camel.Producer;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.util.ExchangeHelper;
@@ -140,7 +141,7 @@
final AtomicBoolean running = new AtomicBoolean(true);
if (streaming) {
- // execute tasks in paralle+streaming and aggregate in the order
they are finished (out of order sequence)
+ // execute tasks in parallel+streaming and aggregate in the order
they are finished (out of order sequence)
completion = new
ExecutorCompletionService<Exchange>(executorService);
} else {
// execute tasks in parallel and aggregate in the order the tasks
are submitted (in order sequence)
@@ -162,6 +163,8 @@
}
try {
+ // set property which endpoint we send to
+ setToEndpoint(subExchange, producer);
producer.process(subExchange);
} catch (Exception e) {
subExchange.setException(e);
@@ -207,12 +210,14 @@
// process it sequentially
try {
+ // set property which endpoint we send to
+ setToEndpoint(subExchange, producer);
producer.process(subExchange);
} catch (Exception e) {
subExchange.setException(e);
}
- // should we stop in case of an exception occured during
processing?
+ // should we stop in case of an exception occurred during
processing?
if (stopOnException && subExchange.getException() != null) {
throw new CamelExchangeException("Sequential processing failed
for number " + total, subExchange, subExchange.getException());
}
@@ -280,6 +285,13 @@
ServiceHelper.startServices(processors);
}
+ private static void setToEndpoint(Exchange exchange, Processor processor) {
+ if (processor instanceof Producer) {
+ Producer producer = (Producer) processor;
+ exchange.setProperty(Exchange.TO_ENDPOINT,
producer.getEndpoint().getEndpointUri());
+ }
+ }
+
/**
* Is the multicast processor working in streaming mode?
*
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=826967&r1=826966&r2=826967&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
Tue Oct 20 06:06:58 2009
@@ -304,7 +304,7 @@
if (processor != null) {
data.failureProcessor = processor;
}
- // route specific on redelivey?
+ // route specific on redelivery?
processor = exceptionPolicy.getOnRedelivery();
if (processor != null) {
data.onRedeliveryProcessor = processor;
@@ -377,6 +377,8 @@
log.trace("Failure processor " + processor + " is processing
Exchange: " + exchange);
}
try {
+ // store the last to endpoint as the failure endpoint
+ exchange.setProperty(Exchange.FAILURE_ENDPOINT,
exchange.getProperty(Exchange.TO_ENDPOINT));
processor.process(exchange);
} catch (Exception e) {
exchange.setException(e);
@@ -406,7 +408,9 @@
} else {
// exception not handled, put exception back in the exchange
exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT,
Exception.class));
- }
+ // and put failure endpoint back as well
+ exchange.setProperty(Exchange.FAILURE_ENDPOINT,
exchange.getProperty(Exchange.TO_ENDPOINT));
+ }
return;
}
@@ -418,6 +422,8 @@
// exception not handled, put exception back in the exchange
exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT,
Exception.class));
+ // and put failure endpoint back as well
+ exchange.setProperty(Exchange.FAILURE_ENDPOINT,
exchange.getProperty(Exchange.TO_ENDPOINT));
} else {
if (log.isDebugEnabled()) {
log.debug("This exchange is handled so its marked as not
failed: " + exchange);
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java?rev=826967&r1=826966&r2=826967&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
Tue Oct 20 06:06:58 2009
@@ -76,6 +76,8 @@
getProducerCache(exchange).doInProducer(endpoint, copy, null, new
ProducerCallback<Object>() {
public Object doInProducer(Producer producer, Exchange
exchange, ExchangePattern exchangePattern) throws Exception {
+ // set property which endpoint we send to
+ exchange.setProperty(Exchange.TO_ENDPOINT,
producer.getEndpoint().getEndpointUri());
producer.process(exchange);
return exchange;
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?rev=826967&r1=826966&r2=826967&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
Tue Oct 20 06:06:58 2009
@@ -110,6 +110,8 @@
if (pattern != null) {
exchange.setPattern(pattern);
}
+ // set property which endpoint we send to
+ exchange.setProperty(Exchange.TO_ENDPOINT,
destination.getEndpointUri());
return exchange;
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=826967&r1=826966&r2=826967&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
Tue Oct 20 06:06:58 2009
@@ -106,13 +106,17 @@
@Override
protected Exchange configureExchange(Exchange exchange, ExchangePattern
pattern) {
+ Exchange answer;
if (newExchangeProcessor == null && newExchangeExpression == null) {
// use a copy of the original exchange
- return configureCopyExchange(exchange);
+ answer = configureCopyExchange(exchange);
} else {
// use a new exchange
- return configureNewExchange(exchange);
+ answer = configureNewExchange(exchange);
}
+ // set property which endpoint we send to
+ answer.setProperty(Exchange.TO_ENDPOINT, destination.getEndpointUri());
+ return answer;
}
private Exchange configureCopyExchange(Exchange exchange) {
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ToEndpointPropertyTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ToEndpointPropertyTest.java?rev=826967&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ToEndpointPropertyTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ToEndpointPropertyTest.java
Tue Oct 20 06:06:58 2009
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class ToEndpointPropertyTest extends ContextTestSupport {
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ public void testSimpleToEndpoint() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").to("mock:result");
+ }
+ });
+ context.start();
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+
mock.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://result");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testMediumToEndpoint() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").to("direct:foo");
+
+ from("direct:foo").to("mock:result");
+
+ }
+ });
+ context.start();
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+
mock.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://result");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testRecipientListToEndpoint() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").recipientList(header("foo"));
+ }
+ });
+ context.start();
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+
mock.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://result");
+
+ template.sendBodyAndHeader("direct:start", "Hello World", "foo",
"mock:result");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testRoutingSlipToEndpoint() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").routingSlip("foo");
+ }
+ });
+ context.start();
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+
mock.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://result");
+
+ MockEndpoint a = getMockEndpoint("mock:a");
+ a.expectedMessageCount(1);
+ a.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://a");
+
+ MockEndpoint b = getMockEndpoint("mock:b");
+ b.expectedMessageCount(1);
+ b.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://b");
+
+ template.sendBodyAndHeader("direct:start", "Hello World", "foo",
"mock:a,mock:b,mock:result");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testWireTapToEndpoint() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").wireTap("mock:tap").to("mock:result");
+ }
+ });
+ context.start();
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+
mock.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://result");
+
+ MockEndpoint tap = getMockEndpoint("mock:tap");
+ tap.expectedMessageCount(1);
+ tap.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://tap");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testMulticastToEndpoint() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").multicast().to("direct:a",
"direct:b").end().process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ String to = exchange.getProperty(Exchange.TO_ENDPOINT,
String.class);
+ assertEquals("direct://b", to);
+ }
+ }).to("mock:result");
+
+ from("direct:a").transform(constant("A"));
+ from("direct:b").transform(constant("B"));
+ }
+ });
+ context.start();
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+ result.message(0).property(Exchange.FAILURE_ENDPOINT).isNull();
+
result.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://result");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testDLCToEndpoint() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+
errorHandler(deadLetterChannel("mock:dead").disableRedelivery());
+
+ from("direct:start").to("direct:foo").to("mock:result");
+
+ from("direct:foo").throwException(new
IllegalArgumentException("Damn"));
+ }
+ });
+ context.start();
+
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+ MockEndpoint dead = getMockEndpoint("mock:dead");
+
dead.message(0).property(Exchange.FAILURE_ENDPOINT).isEqualTo("direct://foo");
+
dead.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://dead");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testMediumDLCToEndpoint() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+
errorHandler(deadLetterChannel("direct:dead").disableRedelivery());
+
+ from("direct:start").to("direct:foo").to("mock:result");
+
+ from("direct:foo").throwException(new
IllegalArgumentException("Damn"));
+
+ from("direct:dead").to("mock:a").to("mock:b").to("mock:dead");
+ }
+ });
+ context.start();
+
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+ MockEndpoint dead = getMockEndpoint("mock:dead");
+
dead.message(0).property(Exchange.FAILURE_ENDPOINT).isEqualTo("direct://foo");
+
dead.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://dead");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testMulticastDLC() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+
errorHandler(deadLetterChannel("mock:dead").disableRedelivery());
+
+ from("direct:start").multicast().to("direct:a", "direct:b");
+
+ from("direct:a").transform(constant("A"));
+ from("direct:b").throwException(new
IllegalArgumentException("Damn"));
+
+ from("direct:dead").to("mock:dead");
+ }
+ });
+ context.start();
+
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+ MockEndpoint dead = getMockEndpoint("mock:dead");
+
dead.message(0).property(Exchange.FAILURE_ENDPOINT).isEqualTo("direct://b");
+
dead.message(0).property(Exchange.TO_ENDPOINT).isEqualTo("mock://dead");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+
+
+}
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ToEndpointPropertyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ToEndpointPropertyTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TracerTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TracerTest.java?rev=826967&r1=826966&r2=826967&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TracerTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/TracerTest.java
Tue Oct 20 06:06:58 2009
@@ -84,7 +84,7 @@
assertNotNull(em.getExchangePattern());
assertEquals("direct://start", em.getFromEndpointUri());
assertNull(em.getHeaders());
- assertNull(em.getProperties());
+ assertNotNull(em.getProperties());
assertNull(em.getOutBody());
assertNull(em.getOutBodyType());
assertNull(em.getOutHeaders());