Author: jstrachan
Date: Wed Oct 22 02:13:04 2008
New Revision: 706913
URL: http://svn.apache.org/viewvc?rev=706913&view=rev
Log:
minor code improvements to ensure no null pointer exceptions and slight
improvement in boolean branching to avoid double expression testing
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=706913&r1=706912&r2=706913&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
Wed Oct 22 02:13:04 2008
@@ -17,6 +17,7 @@
package org.apache.camel.component.seda;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.BlockingQueue;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
@@ -51,32 +52,35 @@
}
public void run() {
- while (isRunAllowed()) {
+ BlockingQueue<Exchange> queue = endpoint.getQueue();
+ while (queue != null && isRunAllowed()) {
final Exchange exchange;
try {
- exchange = endpoint.getQueue().poll(1000,
TimeUnit.MILLISECONDS);
+ exchange = queue.poll(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Interupted: " + e, e);
}
continue;
}
- if (exchange != null && isRunAllowed()) {
- try {
- processor.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
+ if (exchange != null) {
+ if (isRunAllowed()) {
+ try {
+ processor.process(exchange, new AsyncCallback() {
+ public void done(boolean sync) {
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Seda queue caught: " + e, e);
+ }
+ } else {
+ LOG.warn("This consumer is stopped during polling an
exchange, so putting it back on the seda queue: " + exchange);
+ try {
+ queue.put(exchange);
+ } catch (InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Interupted: " + e, e);
}
- });
- } catch (Exception e) {
- LOG.error("Seda queue caught: " + e, e);
- }
- } else if (exchange != null) {
- LOG.warn("This consumer is stopped during polling an exchange,
so putting it back on the seda queue: " + exchange);
- try {
- endpoint.getQueue().put(exchange);
- } catch (InterruptedException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Interupted: " + e, e);
}
}
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=706913&r1=706912&r2=706913&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
Wed Oct 22 02:13:04 2008
@@ -26,6 +26,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
+import org.apache.camel.util.ObjectHelper;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.BrowsableEndpoint;
@@ -50,6 +51,7 @@
public SedaEndpoint(String endpointUri, BlockingQueue<Exchange> queue) {
super(endpointUri);
+ ObjectHelper.notNull(queue, "queue");
this.queue = queue;
}