Polished camel broker component
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7659bfcb Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7659bfcb Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7659bfcb Branch: refs/heads/activemq-5.9 Commit: 7659bfcbda00724fe35b33cb181b4c638094583c Parents: b54a942 Author: Claus Ibsen <[email protected]> Authored: Sat Nov 2 19:27:00 2013 +0100 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Mar 12 08:30:28 2014 -0400 ---------------------------------------------------------------------- .../camel/component/broker/BrokerComponent.java | 3 +- .../component/broker/BrokerConfiguration.java | 2 - .../camel/component/broker/BrokerConsumer.java | 10 +++-- .../camel/component/broker/BrokerEndpoint.java | 7 +--- .../component/broker/BrokerJmsMessage.java | 12 +++--- .../camel/component/broker/BrokerProducer.java | 39 ++++++-------------- 6 files changed, 25 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/7659bfcb/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java ---------------------------------------------------------------------- diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java index 3fce9ae..16edd30 100644 --- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java @@ -57,13 +57,12 @@ public class BrokerComponent extends UriEndpointComponent implements EndpointCom remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_TOPIC_PREFIX.length()), '/'); } - ActiveMQDestination destination = ActiveMQDestination.createDestination(remaining, destinationType); BrokerEndpoint brokerEndpoint = new BrokerEndpoint(uri, this, destination, brokerConfiguration); + setProperties(brokerEndpoint, parameters); return brokerEndpoint; } - @Override public List<String> completeEndpointPath(ComponentConfiguration componentConfiguration, String completionText) { String brokerName = String.valueOf(componentConfiguration.getParameter("brokerName")); http://git-wip-us.apache.org/repos/asf/activemq/blob/7659bfcb/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java ---------------------------------------------------------------------- diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java index 583720e..6609533 100644 --- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java @@ -23,7 +23,6 @@ public class BrokerConfiguration { @UriParam private String brokerName = ""; - public String getBrokerName() { return brokerName; } @@ -32,5 +31,4 @@ public class BrokerConfiguration { this.brokerName = brokerName; } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/7659bfcb/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java ---------------------------------------------------------------------- diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java index 39b25e2..1dc52ae 100644 --- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java @@ -25,11 +25,8 @@ import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; import org.apache.camel.component.jms.JmsBinding; import org.apache.camel.impl.DefaultConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class BrokerConsumer extends DefaultConsumer implements MessageInterceptor { - protected final transient Logger logger = LoggerFactory.getLogger(BrokerConsumer.class); private final JmsBinding jmsBinding = new JmsBinding(); public BrokerConsumer(Endpoint endpoint, Processor processor) { @@ -58,7 +55,12 @@ public class BrokerConsumer extends DefaultConsumer implements MessageIntercepto try { getProcessor().process(exchange); } catch (Exception e) { - logger.error("Failed to process " + exchange, e); + exchange.setException(e); + } + + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing intercepted message: " + message, exchange, exchange.getException()); } } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/7659bfcb/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java ---------------------------------------------------------------------- diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java index e327669..55e7f7b 100644 --- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java @@ -38,15 +38,13 @@ import org.apache.camel.util.UnsafeUriCharactersEncoder; @ManagedResource(description = "Managed Camel Broker Endpoint") @UriEndpoint(scheme = "broker", consumerClass = BrokerConsumer.class) - public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumersSupport, Service { + static final String PRODUCER_BROKER_EXCHANGE = "producerBrokerExchange"; @UriParam private final BrokerConfiguration configuration; private MessageInterceptorRegistry messageInterceptorRegistry; - - @UriPath private final ActiveMQDestination destination; private List<MessageInterceptor> messageInterceptorList = new CopyOnWriteArrayList<MessageInterceptor>(); @@ -70,7 +68,6 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers return consumer; } - @Override public boolean isSingleton() { return false; @@ -85,7 +82,6 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers return destination; } - @Override protected void doStart() throws Exception { super.doStart(); @@ -111,7 +107,6 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers protected void removeMessageInterceptor(MessageInterceptor messageInterceptor) { messageInterceptorRegistry.removeMessageInterceptor(destination, messageInterceptor); - } protected void inject(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq/blob/7659bfcb/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java ---------------------------------------------------------------------- diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java index 02dcabe..f77e97b 100644 --- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java @@ -17,11 +17,14 @@ package org.apache.activemq.camel.component.broker; import javax.jms.Message; + import org.apache.camel.component.jms.JmsBinding; import org.apache.camel.component.jms.JmsMessage; +import org.apache.camel.component.jms.JmsMessageHelper; import org.apache.camel.util.ObjectHelper; public class BrokerJmsMessage extends JmsMessage { + public BrokerJmsMessage(Message jmsMessage, JmsBinding binding) { super(jmsMessage, binding); } @@ -29,12 +32,10 @@ public class BrokerJmsMessage extends JmsMessage { @Override public String toString() { if (getJmsMessage() != null) { - try { - return "BrokerJmsMessage[JMSMessageID: " + getJmsMessage().getJMSMessageID(); - } catch (Exception e) { - } + return "BrokerJmsMessage[JMSMessageID: " + JmsMessageHelper.getJMSMessageID(getJmsMessage()); + } else { + return "BrokerJmsMessage@" + ObjectHelper.getIdentityHashCode(this); } - return "BrokerJmsMessage@" + ObjectHelper.getIdentityHashCode(this); } @Override @@ -45,7 +46,6 @@ public class BrokerJmsMessage extends JmsMessage { } } - @Override public BrokerJmsMessage newInstance() { return new BrokerJmsMessage(null, getBinding()); http://git-wip-us.apache.org/repos/asf/activemq/blob/7659bfcb/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java ---------------------------------------------------------------------- diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java index c12fbee..fcf1256 100644 --- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java @@ -17,10 +17,8 @@ package org.apache.activemq.camel.component.broker; import java.util.Map; -import java.util.concurrent.RejectedExecutionException; import org.apache.activemq.broker.ProducerBrokerExchange; -import org.apache.activemq.camel.converter.ActiveMQMessageConverter; import org.apache.activemq.command.ActiveMQMessage; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; @@ -30,7 +28,6 @@ import org.apache.camel.converter.ObjectConverter; import org.apache.camel.impl.DefaultAsyncProducer; public class BrokerProducer extends DefaultAsyncProducer { - private final ActiveMQMessageConverter activeMQConverter = new ActiveMQMessageConverter(); private final BrokerEndpoint brokerEndpoint; public BrokerProducer(BrokerEndpoint endpoint) { @@ -38,24 +35,12 @@ public class BrokerProducer extends DefaultAsyncProducer { brokerEndpoint = endpoint; } - @Override public boolean process(Exchange exchange, AsyncCallback callback) { - // deny processing if we are not started - if (!isRunAllowed()) { - if (exchange.getException() == null) { - exchange.setException(new RejectedExecutionException()); - } - // we cannot process so invoke callback - callback.done(true); - return true; - } - try { //In the middle of the broker - InOut doesn't make any sense //so we do in only return processInOnly(exchange, callback); - } catch (Throwable e) { // must catch exception to ensure callback is invoked as expected // to let Camel error handling deal with this @@ -74,8 +59,6 @@ public class BrokerProducer extends DefaultAsyncProducer { ProducerBrokerExchange producerBrokerExchange = (ProducerBrokerExchange) exchange.getProperty(BrokerEndpoint.PRODUCER_BROKER_EXCHANGE); brokerEndpoint.inject(producerBrokerExchange, message); - - } } catch (Exception e) { exchange.setException(e); @@ -85,34 +68,34 @@ public class BrokerProducer extends DefaultAsyncProducer { } private ActiveMQMessage getMessage(Exchange exchange) throws Exception { - ActiveMQMessage result = null; - Message camelMesssage = null; + ActiveMQMessage result; + Message camelMessage; if (exchange.hasOut()) { - camelMesssage = exchange.getOut(); + camelMessage = exchange.getOut(); } else { - camelMesssage = exchange.getIn(); + camelMessage = exchange.getIn(); } - Map<String, Object> headers = camelMesssage.getHeaders(); + Map<String, Object> headers = camelMessage.getHeaders(); /** * We purposely don't want to support injecting messages half-way through * broker processing - use the activemq camel component for that - but * we will support changing message headers and destinations */ - if (camelMesssage instanceof JmsMessage) { - JmsMessage jmsMessage = (JmsMessage) camelMesssage; + if (camelMessage instanceof JmsMessage) { + JmsMessage jmsMessage = (JmsMessage) camelMessage; if (jmsMessage.getJmsMessage() instanceof ActiveMQMessage) { result = (ActiveMQMessage) jmsMessage.getJmsMessage(); //lets apply any new message headers setJmsHeaders(result, headers); } else { - - throw new IllegalStateException("not the original message from the broker " + jmsMessage.getJmsMessage()); + throw new IllegalStateException("Not the original message from the broker " + jmsMessage.getJmsMessage()); } } else { - throw new IllegalStateException("not the original message from the broker " + camelMesssage); + throw new IllegalStateException("Not the original message from the broker " + camelMessage); } + return result; } @@ -154,6 +137,6 @@ public class BrokerProducer extends DefaultAsyncProducer { } } } - } + }
