Polished camel broker component

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7659bfcb
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7659bfcb
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7659bfcb

Branch: refs/heads/activemq-5.9
Commit: 7659bfcbda00724fe35b33cb181b4c638094583c
Parents: b54a942
Author: Claus Ibsen <[email protected]>
Authored: Sat Nov 2 19:27:00 2013 +0100
Committer: Hadrian Zbarcea <[email protected]>
Committed: Wed Mar 12 08:30:28 2014 -0400

----------------------------------------------------------------------
 .../camel/component/broker/BrokerComponent.java |  3 +-
 .../component/broker/BrokerConfiguration.java   |  2 -
 .../camel/component/broker/BrokerConsumer.java  | 10 +++--
 .../camel/component/broker/BrokerEndpoint.java  |  7 +---
 .../component/broker/BrokerJmsMessage.java      | 12 +++---
 .../camel/component/broker/BrokerProducer.java  | 39 ++++++--------------
 6 files changed, 25 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7659bfcb/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java
----------------------------------------------------------------------
diff --git 
a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java
 
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java
index 3fce9ae..16edd30 100644
--- 
a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java
+++ 
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java
@@ -57,13 +57,12 @@ public class BrokerComponent extends UriEndpointComponent 
implements EndpointCom
             remaining = 
removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_TOPIC_PREFIX.length()),
 '/');
         }
 
-
         ActiveMQDestination destination = 
ActiveMQDestination.createDestination(remaining, destinationType);
         BrokerEndpoint brokerEndpoint = new BrokerEndpoint(uri, this, 
destination, brokerConfiguration);
+        setProperties(brokerEndpoint, parameters);
         return brokerEndpoint;
     }
 
-
     @Override
     public List<String> completeEndpointPath(ComponentConfiguration 
componentConfiguration, String completionText) {
         String brokerName = 
String.valueOf(componentConfiguration.getParameter("brokerName"));

http://git-wip-us.apache.org/repos/asf/activemq/blob/7659bfcb/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java
 
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java
index 583720e..6609533 100644
--- 
a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java
+++ 
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java
@@ -23,7 +23,6 @@ public class BrokerConfiguration {
     @UriParam
     private String brokerName = "";
 
-
     public String getBrokerName() {
         return brokerName;
     }
@@ -32,5 +31,4 @@ public class BrokerConfiguration {
         this.brokerName = brokerName;
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/7659bfcb/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java
----------------------------------------------------------------------
diff --git 
a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java
 
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java
index 39b25e2..1dc52ae 100644
--- 
a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java
+++ 
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java
@@ -25,11 +25,8 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.component.jms.JmsBinding;
 import org.apache.camel.impl.DefaultConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class BrokerConsumer extends DefaultConsumer implements 
MessageInterceptor {
-    protected final transient Logger logger = 
LoggerFactory.getLogger(BrokerConsumer.class);
     private final JmsBinding jmsBinding = new JmsBinding();
 
     public BrokerConsumer(Endpoint endpoint, Processor processor) {
@@ -58,7 +55,12 @@ public class BrokerConsumer extends DefaultConsumer 
implements MessageIntercepto
         try {
             getProcessor().process(exchange);
         } catch (Exception e) {
-            logger.error("Failed to process " + exchange, e);
+            exchange.setException(e);
+        }
+
+        if (exchange.getException() != null) {
+            getExceptionHandler().handleException("Error processing 
intercepted message: " + message, exchange, exchange.getException());
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/7659bfcb/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java
----------------------------------------------------------------------
diff --git 
a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java
 
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java
index e327669..55e7f7b 100644
--- 
a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java
+++ 
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java
@@ -38,15 +38,13 @@ import org.apache.camel.util.UnsafeUriCharactersEncoder;
 
 @ManagedResource(description = "Managed Camel Broker Endpoint")
 @UriEndpoint(scheme = "broker", consumerClass = BrokerConsumer.class)
-
 public class BrokerEndpoint extends DefaultEndpoint implements 
MultipleConsumersSupport, Service {
+
     static final String PRODUCER_BROKER_EXCHANGE = "producerBrokerExchange";
 
     @UriParam
     private final BrokerConfiguration configuration;
     private MessageInterceptorRegistry messageInterceptorRegistry;
-
-
     @UriPath
     private final ActiveMQDestination destination;
     private List<MessageInterceptor> messageInterceptorList = new 
CopyOnWriteArrayList<MessageInterceptor>();
@@ -70,7 +68,6 @@ public class BrokerEndpoint extends DefaultEndpoint 
implements MultipleConsumers
         return consumer;
     }
 
-
     @Override
     public boolean isSingleton() {
         return false;
@@ -85,7 +82,6 @@ public class BrokerEndpoint extends DefaultEndpoint 
implements MultipleConsumers
         return destination;
     }
 
-
     @Override
     protected void doStart() throws Exception {
         super.doStart();
@@ -111,7 +107,6 @@ public class BrokerEndpoint extends DefaultEndpoint 
implements MultipleConsumers
 
     protected void removeMessageInterceptor(MessageInterceptor 
messageInterceptor) {
         messageInterceptorRegistry.removeMessageInterceptor(destination, 
messageInterceptor);
-
     }
 
     protected void inject(ProducerBrokerExchange producerBrokerExchange, 
Message message) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/7659bfcb/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java
----------------------------------------------------------------------
diff --git 
a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java
 
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java
index 02dcabe..f77e97b 100644
--- 
a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java
+++ 
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java
@@ -17,11 +17,14 @@
 package org.apache.activemq.camel.component.broker;
 
 import javax.jms.Message;
+
 import org.apache.camel.component.jms.JmsBinding;
 import org.apache.camel.component.jms.JmsMessage;
+import org.apache.camel.component.jms.JmsMessageHelper;
 import org.apache.camel.util.ObjectHelper;
 
 public class BrokerJmsMessage extends JmsMessage {
+
     public BrokerJmsMessage(Message jmsMessage, JmsBinding binding) {
         super(jmsMessage, binding);
     }
@@ -29,12 +32,10 @@ public class BrokerJmsMessage extends JmsMessage {
     @Override
     public String toString() {
         if (getJmsMessage() != null) {
-            try {
-                return "BrokerJmsMessage[JMSMessageID: " + 
getJmsMessage().getJMSMessageID();
-            } catch (Exception e) {
-            }
+            return "BrokerJmsMessage[JMSMessageID: " + 
JmsMessageHelper.getJMSMessageID(getJmsMessage());
+        } else {
+            return "BrokerJmsMessage@" + 
ObjectHelper.getIdentityHashCode(this);
         }
-        return "BrokerJmsMessage@" + ObjectHelper.getIdentityHashCode(this);
     }
 
     @Override
@@ -45,7 +46,6 @@ public class BrokerJmsMessage extends JmsMessage {
         }
     }
 
-
     @Override
     public BrokerJmsMessage newInstance() {
         return new BrokerJmsMessage(null, getBinding());

http://git-wip-us.apache.org/repos/asf/activemq/blob/7659bfcb/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
----------------------------------------------------------------------
diff --git 
a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
 
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
index c12fbee..fcf1256 100644
--- 
a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
+++ 
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
@@ -17,10 +17,8 @@
 package org.apache.activemq.camel.component.broker;
 
 import java.util.Map;
-import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.camel.converter.ActiveMQMessageConverter;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
@@ -30,7 +28,6 @@ import org.apache.camel.converter.ObjectConverter;
 import org.apache.camel.impl.DefaultAsyncProducer;
 
 public class BrokerProducer extends DefaultAsyncProducer {
-    private final ActiveMQMessageConverter activeMQConverter = new 
ActiveMQMessageConverter();
     private final BrokerEndpoint brokerEndpoint;
 
     public BrokerProducer(BrokerEndpoint endpoint) {
@@ -38,24 +35,12 @@ public class BrokerProducer extends DefaultAsyncProducer {
         brokerEndpoint = endpoint;
     }
 
-
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        // deny processing if we are not started
-        if (!isRunAllowed()) {
-            if (exchange.getException() == null) {
-                exchange.setException(new RejectedExecutionException());
-            }
-            // we cannot process so invoke callback
-            callback.done(true);
-            return true;
-        }
-
         try {
             //In the middle of the broker - InOut doesn't make any sense
             //so we do in only
             return processInOnly(exchange, callback);
-
         } catch (Throwable e) {
             // must catch exception to ensure callback is invoked as expected
             // to let Camel error handling deal with this
@@ -74,8 +59,6 @@ public class BrokerProducer extends DefaultAsyncProducer {
                 ProducerBrokerExchange producerBrokerExchange = 
(ProducerBrokerExchange) 
exchange.getProperty(BrokerEndpoint.PRODUCER_BROKER_EXCHANGE);
 
                 brokerEndpoint.inject(producerBrokerExchange, message);
-
-
             }
         } catch (Exception e) {
             exchange.setException(e);
@@ -85,34 +68,34 @@ public class BrokerProducer extends DefaultAsyncProducer {
     }
 
     private ActiveMQMessage getMessage(Exchange exchange) throws Exception {
-        ActiveMQMessage result = null;
-        Message camelMesssage = null;
+        ActiveMQMessage result;
+        Message camelMessage;
         if (exchange.hasOut()) {
-            camelMesssage = exchange.getOut();
+            camelMessage = exchange.getOut();
         } else {
-            camelMesssage = exchange.getIn();
+            camelMessage = exchange.getIn();
         }
 
-        Map<String, Object> headers = camelMesssage.getHeaders();
+        Map<String, Object> headers = camelMessage.getHeaders();
 
         /**
          * We purposely don't want to support injecting messages half-way 
through
          * broker processing - use the activemq camel component for that - but
          * we will support changing message headers and destinations
          */
-        if (camelMesssage instanceof JmsMessage) {
-            JmsMessage jmsMessage = (JmsMessage) camelMesssage;
+        if (camelMessage instanceof JmsMessage) {
+            JmsMessage jmsMessage = (JmsMessage) camelMessage;
             if (jmsMessage.getJmsMessage() instanceof ActiveMQMessage) {
                 result = (ActiveMQMessage) jmsMessage.getJmsMessage();
                 //lets apply any new message headers
                 setJmsHeaders(result, headers);
             } else {
-
-                throw new IllegalStateException("not the original message from 
the broker " + jmsMessage.getJmsMessage());
+                throw new IllegalStateException("Not the original message from 
the broker " + jmsMessage.getJmsMessage());
             }
         } else {
-            throw new IllegalStateException("not the original message from the 
broker " + camelMesssage);
+            throw new IllegalStateException("Not the original message from the 
broker " + camelMessage);
         }
+
         return result;
     }
 
@@ -154,6 +137,6 @@ public class BrokerProducer extends DefaultAsyncProducer {
                 }
             }
         }
-
     }
+
 }

Reply via email to