CAMEL-9116: camel-sjms should use same binding to/from JMS as camel-jms does.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/25103bf6 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/25103bf6 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/25103bf6 Branch: refs/heads/master Commit: 25103bf65d15da2eafe147299f563385c33ff526 Parents: 93bf668 Author: Claus Ibsen <[email protected]> Authored: Mon Sep 7 11:24:54 2015 +0200 Committer: Claus Ibsen <[email protected]> Committed: Mon Sep 7 11:24:54 2015 +0200 ---------------------------------------------------------------------- .../camel/component/sjms/jms/JmsBinding.java | 78 ++++++++++---------- .../component/sjms/producer/InOutProducer.java | 41 ++++------ 2 files changed, 55 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/25103bf6/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java index 8dc2841..773813c 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java @@ -39,6 +39,8 @@ import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TextMessage; +import org.w3c.dom.Node; + import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.NoTypeConversionAvailableException; @@ -52,7 +54,6 @@ import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.w3c.dom.Node; import static org.apache.camel.component.sjms.jms.JmsMessageHelper.normalizeDestinationName; @@ -455,7 +456,8 @@ public class JmsBinding { // force a specific type from the endpoint configuration type = endpoint.getConfiguration().getJmsMessageType(); } else { -*/ type = getJMSMessageTypeForBody(exchange, body, headers, session, context); +*/ + type = getJMSMessageTypeForBody(exchange, body, headers, session, context); //} // create the JmsMessage based on the type @@ -523,46 +525,46 @@ public class JmsBinding { */ protected Message createJmsMessageForType(Exchange exchange, Object body, Map<String, Object> headers, Session session, CamelContext context, JmsMessageType type) throws JMSException { switch (type) { - case Text: { - TextMessage message = session.createTextMessage(); - if (body != null) { - String payload = context.getTypeConverter().convertTo(String.class, exchange, body); - message.setText(payload); - } - return message; + case Text: { + TextMessage message = session.createTextMessage(); + if (body != null) { + String payload = context.getTypeConverter().convertTo(String.class, exchange, body); + message.setText(payload); } - case Bytes: { - BytesMessage message = session.createBytesMessage(); - if (body != null) { - byte[] payload = context.getTypeConverter().convertTo(byte[].class, exchange, body); - message.writeBytes(payload); - } - return message; + return message; + } + case Bytes: { + BytesMessage message = session.createBytesMessage(); + if (body != null) { + byte[] payload = context.getTypeConverter().convertTo(byte[].class, exchange, body); + message.writeBytes(payload); } - case Map: { - MapMessage message = session.createMapMessage(); - if (body != null) { - Map<?, ?> payload = context.getTypeConverter().convertTo(Map.class, exchange, body); - populateMapMessage(message, payload, context); - } - return message; + return message; + } + case Map: { + MapMessage message = session.createMapMessage(); + if (body != null) { + Map<?, ?> payload = context.getTypeConverter().convertTo(Map.class, exchange, body); + populateMapMessage(message, payload, context); } - case Object: - ObjectMessage message = session.createObjectMessage(); - if (body != null) { - try { - Serializable payload = context.getTypeConverter().mandatoryConvertTo(Serializable.class, exchange, body); - message.setObject(payload); - } catch (NoTypeConversionAvailableException e) { - // cannot convert to serializable then thrown an exception to avoid sending a null message - JMSException cause = new MessageFormatException(e.getMessage()); - cause.initCause(e); - throw cause; - } + return message; + } + case Object: + ObjectMessage message = session.createObjectMessage(); + if (body != null) { + try { + Serializable payload = context.getTypeConverter().mandatoryConvertTo(Serializable.class, exchange, body); + message.setObject(payload); + } catch (NoTypeConversionAvailableException e) { + // cannot convert to serializable then thrown an exception to avoid sending a null message + JMSException cause = new MessageFormatException(e.getMessage()); + cause.initCause(e); + throw cause; } - return message; - default: - break; + } + return message; + default: + break; } return null; } http://git-wip-us.apache.org/repos/asf/camel/blob/25103bf6/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java index 1c535b6..202b429 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java @@ -17,7 +17,6 @@ package org.apache.camel.component.sjms.producer; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; @@ -43,7 +42,6 @@ import org.apache.camel.component.sjms.jms.JmsMessageHelper; import org.apache.camel.component.sjms.jms.JmsObjectFactory; import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization; import org.apache.camel.spi.UuidGenerator; -import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; import org.apache.commons.pool.BasePoolableObjectFactory; import org.apache.commons.pool.impl.GenericObjectPool; @@ -57,6 +55,11 @@ public class InOutProducer extends SjmsProducer { private static final String GENERATED_CORRELATION_ID_PREFIX = "Camel-"; private UuidGenerator uuidGenerator; + private GenericObjectPool<MessageConsumerResources> consumers; + + public InOutProducer(final SjmsEndpoint endpoint) { + super(endpoint); + } public UuidGenerator getUuidGenerator() { return uuidGenerator; @@ -67,8 +70,7 @@ public class InOutProducer extends SjmsProducer { } /** - * A pool of {@link MessageConsumerResources} objects that are the reply - * consumers. + * A pool of {@link MessageConsumerResources} objects that are the reply consumers. */ protected class MessageConsumerResourcesFactory extends BasePoolableObjectFactory<MessageConsumerResources> { @@ -135,12 +137,6 @@ public class InOutProducer extends SjmsProducer { } } - private GenericObjectPool<MessageConsumerResources> consumers; - - public InOutProducer(final SjmsEndpoint endpoint) { - super(endpoint); - } - @Override protected void doStart() throws Exception { if (ObjectHelper.isEmpty(getNamedReplyTo())) { @@ -152,12 +148,12 @@ public class InOutProducer extends SjmsProducer { // use the generator configured on the camel context uuidGenerator = getEndpoint().getCamelContext().getUuidGenerator(); } - if (getConsumers() == null) { - setConsumers(new GenericObjectPool<MessageConsumerResources>(new MessageConsumerResourcesFactory())); - getConsumers().setMaxActive(getConsumerCount()); - getConsumers().setMaxIdle(getConsumerCount()); - while (getConsumers().getNumIdle() < getConsumers().getMaxIdle()) { - getConsumers().addObject(); + if (consumers == null) { + consumers = new GenericObjectPool<MessageConsumerResources>(new MessageConsumerResourcesFactory()); + consumers.setMaxActive(getConsumerCount()); + consumers.setMaxIdle(getConsumerCount()); + while (consumers.getNumIdle() < consumers.getMaxIdle()) { + consumers.addObject(); } } super.doStart(); @@ -166,9 +162,9 @@ public class InOutProducer extends SjmsProducer { @Override protected void doStop() throws Exception { super.doStop(); - if (getConsumers() != null) { - getConsumers().close(); - setConsumers(null); + if (consumers != null) { + consumers.close(); + consumers = null; } } @@ -260,11 +256,4 @@ public class InOutProducer extends SjmsProducer { callback.done(isSynchronous()); } - public void setConsumers(GenericObjectPool<MessageConsumerResources> consumers) { - this.consumers = consumers; - } - - public GenericObjectPool<MessageConsumerResources> getConsumers() { - return consumers; - } }
