This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push: new f9e6e97 CAMEL-16222: PooledExchangeFactory experiment f9e6e97 is described below commit f9e6e97da840e7d3534c209251d7479698f9ea5a Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Feb 23 15:24:26 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../camel/component/redis/RedisConsumer.java | 8 +-- .../spring/ws/SpringWebserviceConsumer.java | 75 ++++++++++++++-------- .../spring/ws/SpringWebserviceMessage.java | 6 ++ 3 files changed, 59 insertions(+), 30 deletions(-) diff --git a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisConsumer.java b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisConsumer.java index b6c85df..72da864 100644 --- a/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisConsumer.java +++ b/components/camel-spring-redis/src/main/java/org/apache/camel/component/redis/RedisConsumer.java @@ -16,7 +16,6 @@ */ package org.apache.camel.component.redis; -import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; @@ -73,13 +72,14 @@ public class RedisConsumer extends DefaultConsumer implements MessageListener { @Override public void onMessage(Message message, byte[] pattern) { try { - Exchange exchange = getEndpoint().createExchange(); + Exchange exchange = createExchange(true); setChannel(exchange, message.getChannel()); setPattern(exchange, pattern); setBody(exchange, message.getBody()); + getProcessor().process(exchange); } catch (Exception e) { - throw new RuntimeException(e); + getExceptionHandler().handleException("Error processing redis message", e); } } @@ -95,7 +95,7 @@ public class RedisConsumer extends DefaultConsumer implements MessageListener { } } - private void setChannel(Exchange exchange, byte[] message) throws UnsupportedEncodingException { + private void setChannel(Exchange exchange, byte[] message) { if (message != null) { exchange.getIn().setHeader(RedisConstants.CHANNEL, new String(message, StandardCharsets.UTF_8)); } diff --git a/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceConsumer.java b/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceConsumer.java index 9b12824..abebef4 100644 --- a/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceConsumer.java +++ b/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceConsumer.java @@ -28,9 +28,9 @@ import javax.xml.transform.Source; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; -import org.apache.camel.attachment.AttachmentMessage; import org.apache.camel.attachment.DefaultAttachmentMessage; import org.apache.camel.support.DefaultConsumer; import org.springframework.ws.WebServiceMessage; @@ -61,25 +61,32 @@ public class SpringWebserviceConsumer extends DefaultConsumer implements Message */ @Override public void invoke(MessageContext messageContext) throws Exception { - Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOptionalOut); - populateExchangeFromMessageContext(messageContext, exchange); - - // populate camel exchange with breadcrumb from transport header - populateExchangeWithBreadcrumbFromMessageContext(messageContext, exchange); - - // start message processing - getProcessor().process(exchange); - - if (exchange.getException() != null) { - throw exchange.getException(); - } else if (exchange.getPattern().isOutCapable()) { - Message responseMessage = exchange.getMessage(Message.class); - if (responseMessage != null) { - Source responseBody = responseMessage.getBody(Source.class); - WebServiceMessage response = messageContext.getResponse(); - configuration.getMessageFilter().filterConsumer(exchange, response); - toResult(responseBody, response.getPayloadResult()); + Exchange exchange = createExchange(false); + try { + exchange.setPattern(ExchangePattern.InOptionalOut); + populateExchangeFromMessageContext(messageContext, exchange); + + // populate camel exchange with breadcrumb from transport header + if (getEndpoint().getCamelContext().isUseBreadcrumb()) { + populateExchangeWithBreadcrumbFromMessageContext(messageContext, exchange); } + + // start message processing + getProcessor().process(exchange); + + if (exchange.getException() != null) { + throw exchange.getException(); + } else if (exchange.getPattern().isOutCapable()) { + Message responseMessage = exchange.getMessage(Message.class); + if (responseMessage != null) { + Source responseBody = responseMessage.getBody(Source.class); + WebServiceMessage response = messageContext.getResponse(); + configuration.getMessageFilter().filterConsumer(exchange, response); + toResult(responseBody, response.getPayloadResult()); + } + } + } finally { + releaseExchange(exchange, false); } } @@ -116,7 +123,9 @@ public class SpringWebserviceConsumer extends DefaultConsumer implements Message if (messageContext != null) { HttpServletRequest obj = (HttpServletRequest) messageContext.getProperty("transport.http.servletRequest"); String breadcrumbId = obj.getHeader(Exchange.BREADCRUMB_ID); - exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, breadcrumbId); + if (breadcrumbId != null) { + exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, breadcrumbId); + } } } @@ -125,10 +134,17 @@ public class SpringWebserviceConsumer extends DefaultConsumer implements Message // create inbound message WebServiceMessage request = messageContext.getRequest(); - SpringWebserviceMessage inMessage = new SpringWebserviceMessage(exchange.getContext(), request); - exchange.setIn(inMessage); - extractSourceFromSoapHeader(inMessage.getHeaders(), request); - extractAttachmentsFromRequest(request, new DefaultAttachmentMessage(inMessage)); + + SpringWebserviceMessage swm = exchange.adapt(ExtendedExchange.class).getInOrNull(SpringWebserviceMessage.class); + if (swm == null) { + swm = new SpringWebserviceMessage(exchange.getContext(), request); + exchange.setIn(swm); + } else { + swm.setWebServiceMessage(request); + } + + extractSourceFromSoapHeader(exchange.getIn().getHeaders(), request); + extractAttachmentsFromRequest(request, exchange); } private void populateExchangeWithPropertiesFromMessageContext( @@ -181,12 +197,19 @@ public class SpringWebserviceConsumer extends DefaultConsumer implements Message private void extractAttachmentsFromRequest( final WebServiceMessage request, - final AttachmentMessage inMessage) { + final Exchange exchange) { + + DefaultAttachmentMessage dam = null; + if (request instanceof MimeMessage) { Iterator<Attachment> attachmentsIterator = ((MimeMessage) request).getAttachments(); while (attachmentsIterator.hasNext()) { Attachment attachment = attachmentsIterator.next(); - inMessage.addAttachment(attachment.getContentId(), attachment.getDataHandler()); + if (dam == null) { + // this is just a wrapper which will set data on the IN + dam = new DefaultAttachmentMessage(exchange.getIn()); + } + dam.addAttachment(attachment.getContentId(), attachment.getDataHandler()); } } } diff --git a/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceMessage.java b/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceMessage.java index 6101345..49274a4 100644 --- a/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceMessage.java +++ b/components/camel-spring-ws/src/main/java/org/apache/camel/component/spring/ws/SpringWebserviceMessage.java @@ -29,6 +29,12 @@ public class SpringWebserviceMessage extends DefaultMessage { } @Override + public void reset() { + super.reset(); + webServiceMessage = null; + } + + @Override protected Object createBody() { if (webServiceMessage != null) { return webServiceMessage.getPayloadSource();