Author: davsclaus
Date: Mon Jan 10 14:10:05 2011
New Revision: 1057202
URL: http://svn.apache.org/viewvc?rev=1057202&view=rev
Log:
CAMEL-3523: Optimized seda component to only use logic to keep track on seda
endpoints come and go if multiple consumers option has been enabled.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1057202&r1=1057201&r2=1057202&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
Mon Jan 10 14:10:05 2011
@@ -176,7 +176,7 @@ public class SedaConsumer extends Servic
}
// use a multicast processor to process it
- MulticastProcessor mp = endpoint.getConumserMulticastProcessor();
+ MulticastProcessor mp = endpoint.getConsumerMulticastProcessor();
// and use the asynchronous routing engine to support it
AsyncProcessorHelper.process(mp, exchange, new AsyncCallback() {
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1057202&r1=1057201&r2=1057202&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
Mon Jan 10 14:10:05 2011
@@ -54,7 +54,8 @@ public class SedaEndpoint extends Defaul
private long timeout = 30000;
private volatile Set<SedaProducer> producers = new
CopyOnWriteArraySet<SedaProducer>();
private volatile Set<SedaConsumer> consumers = new
CopyOnWriteArraySet<SedaConsumer>();
- private volatile MulticastProcessor conumserMulticastProcessor;
+ private volatile MulticastProcessor consumerMulticastProcessor;
+ private volatile boolean multicastStarted;
public SedaEndpoint() {
}
@@ -100,30 +101,43 @@ public class SedaEndpoint extends Defaul
return queue;
}
- protected synchronized MulticastProcessor getConumserMulticastProcessor() {
- return conumserMulticastProcessor;
+ protected synchronized MulticastProcessor getConsumerMulticastProcessor()
throws Exception {
+ if (!multicastStarted && consumerMulticastProcessor != null) {
+ // only start it on-demand to avoid starting it during stopping
+ ServiceHelper.startService(consumerMulticastProcessor);
+ multicastStarted = true;
+ }
+ return consumerMulticastProcessor;
}
protected synchronized void updateMulticastProcessor() throws Exception {
- if (conumserMulticastProcessor != null) {
- ServiceHelper.stopService(conumserMulticastProcessor);
+ if (consumerMulticastProcessor != null) {
+ ServiceHelper.stopService(consumerMulticastProcessor);
}
int size = getConsumers().size();
if (size == 0 && multicastExecutor != null) {
- // stop the multicastExecutor
+ // stop the multicast executor as its not needed anymore when size
is zero
getCamelContext().getExecutorServiceStrategy().shutdown(multicastExecutor);
multicastExecutor = null;
}
- if (size == 1 && multicastExecutor == null) {
- multicastExecutor =
getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this,
getEndpointUri() + "(multicast)");
- }
- List<Processor> processors = new ArrayList<Processor>(size);
- for (SedaConsumer consumer : getConsumers()) {
- processors.add(consumer.getProcessor());
+ if (size > 1) {
+ if (multicastExecutor == null) {
+ // create multicast executor as we need it when we have more
than 1 processor
+ multicastExecutor =
getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this,
getEndpointUri() + "(multicast)");
+ }
+ // create list of consumers to multicast to
+ List<Processor> processors = new ArrayList<Processor>(size);
+ for (SedaConsumer consumer : getConsumers()) {
+ processors.add(consumer.getProcessor());
+ }
+ // create multicast processor
+ multicastStarted = false;
+ consumerMulticastProcessor = new
MulticastProcessor(getCamelContext(), processors, null, true,
multicastExecutor, false, false, 0);
+ } else {
+ // not needed
+ consumerMulticastProcessor = null;
}
- conumserMulticastProcessor = new MulticastProcessor(getCamelContext(),
processors, null, true, multicastExecutor, false, false, 0);
- ServiceHelper.startService(conumserMulticastProcessor);
}
public void setQueue(BlockingQueue<Exchange> queue) {
@@ -210,12 +224,16 @@ public class SedaEndpoint extends Defaul
void onStarted(SedaConsumer consumer) throws Exception {
consumers.add(consumer);
- updateMulticastProcessor();
+ if (isMultipleConsumers()) {
+ updateMulticastProcessor();
+ }
}
void onStopped(SedaConsumer consumer) throws Exception {
consumers.remove(consumer);
- updateMulticastProcessor();
+ if (isMultipleConsumers()) {
+ updateMulticastProcessor();
+ }
}
}