This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit dac13922d47b0e07de1cec50ebfe569f096f3948 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Aug 6 16:23:24 2019 +0200 CAMEL-13828: DefaultExchangeHolder - Do not propgate exchange id for camel-jms with transferExchange as its across JVMs etc. --- .../org/apache/camel/component/jms/JmsBinding.java | 2 +- ...ransferExchangeInflightRepositoryFlushTest.java | 12 ++---- .../camel/support/DefaultExchangeHolder.java | 46 ++++++++++------------ 3 files changed, 26 insertions(+), 34 deletions(-) diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java index fcfacd6..7f02f0a 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java @@ -519,7 +519,7 @@ public class JmsBinding { // special for transferExchange if (endpoint != null && endpoint.isTransferExchange()) { LOG.trace("Option transferExchange=true so we use JmsMessageType: Object"); - Serializable holder = DefaultExchangeHolder.marshal(exchange, true, endpoint.isAllowSerializedHeaders()); + Serializable holder = DefaultExchangeHolder.marshal(exchange, true, endpoint.isAllowSerializedHeaders(), false); Message answer = session.createObjectMessage(holder); // ensure default delivery mode is used by default answer.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE); diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutTransferExchangeInflightRepositoryFlushTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutTransferExchangeInflightRepositoryFlushTest.java index 0c83e9a..2b59fb8 100644 --- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutTransferExchangeInflightRepositoryFlushTest.java +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutTransferExchangeInflightRepositoryFlushTest.java @@ -65,21 +65,17 @@ public class JmsInOutTransferExchangeInflightRepositoryFlushTest extends CamelTe return new RouteBuilder() { public void configure() { from("direct:start") - .log("A ${exchangeId}") - .inOut("activemq:responseGenerator?transferExchange=true&requestTimeout=20000") - .log("A ${exchangeId}") - .to("log:result", "mock:result"); + .inOut("activemq:responseGenerator?transferExchange=true&requestTimeout=5000") + .to("mock:result"); from("activemq:responseGenerator?transferExchange=true") - .log("B ${exchangeId}") .process(new Processor() { public void process(Exchange exchange) throws Exception { // there are 2 inflight (one for both routes) assertEquals(2, exchange.getContext().getInflightRepository().size()); - exchange.getMessage().setBody(new SerializableResponseDto(true)); + exchange.getIn().setBody(new SerializableResponseDto(true)); } - }).to("log:reply") - .log("B ${exchangeId}"); + }); } }; } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchangeHolder.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchangeHolder.java index ad37459..5ed075c 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchangeHolder.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchangeHolder.java @@ -93,29 +93,7 @@ public class DefaultExchangeHolder implements Serializable { * @return the holder object with information copied form the exchange */ public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties) { - ObjectHelper.notNull(exchange, "exchange"); - - // we do not support files - Object body = exchange.getIn().getBody(); - if (body instanceof WrappedFile || body instanceof File) { - throw new RuntimeExchangeException("Message body of type " + body.getClass().getCanonicalName() + " is not supported by this marshaller.", exchange); - } - - DefaultExchangeHolder payload = new DefaultExchangeHolder(); - - payload.exchangeId = exchange.getExchangeId(); - payload.inBody = checkSerializableBody("in body", exchange, exchange.getIn().getBody()); - payload.safeSetInHeaders(exchange, false); - if (exchange.hasOut()) { - payload.outBody = checkSerializableBody("out body", exchange, exchange.getOut().getBody()); - payload.safeSetOutHeaders(exchange, false); - } - if (includeProperties) { - payload.safeSetProperties(exchange, false); - } - payload.exception = exchange.getException(); - - return payload; + return marshal(exchange, includeProperties, false, true); } /** @@ -127,6 +105,20 @@ public class DefaultExchangeHolder implements Serializable { * @return the holder object with information copied form the exchange */ public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties, boolean allowSerializedHeaders) { + return marshal(exchange, includeProperties, allowSerializedHeaders, true); + } + + /** + * Creates a payload object with the information from the given exchange. + * + * @param exchange the exchange, must <b>not</b> be <tt>null</tt> + * @param includeProperties whether or not to include exchange properties + * @param allowSerializedHeaders whether or not to include serialized headers + * @param preserveExchangeId whether to preserve exchange id + * @return the holder object with information copied form the exchange + */ + public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties, + boolean allowSerializedHeaders, boolean preserveExchangeId) { ObjectHelper.notNull(exchange, "exchange"); // we do not support files @@ -137,7 +129,9 @@ public class DefaultExchangeHolder implements Serializable { DefaultExchangeHolder payload = new DefaultExchangeHolder(); - payload.exchangeId = exchange.getExchangeId(); + if (preserveExchangeId) { + payload.exchangeId = exchange.getExchangeId(); + } payload.inBody = checkSerializableBody("in body", exchange, exchange.getIn().getBody()); payload.safeSetInHeaders(exchange, allowSerializedHeaders); if (exchange.hasOut()) { @@ -162,7 +156,9 @@ public class DefaultExchangeHolder implements Serializable { ObjectHelper.notNull(exchange, "exchange"); ObjectHelper.notNull(payload, "payload"); - exchange.setExchangeId(payload.exchangeId); + if (payload.exchangeId != null) { + exchange.setExchangeId(payload.exchangeId); + } exchange.getIn().setBody(payload.inBody); if (payload.inHeaders != null) { exchange.getIn().setHeaders(payload.inHeaders);