Author: lhein
Date: Tue Nov 2 11:47:41 2010
New Revision: 1030007
URL: http://svn.apache.org/viewvc?rev=1030007&view=rev
Log:
now different executors are used for consumer and provider endpoints (see
SM-2007)
Modified:
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsJcaConsumerEndpoint.java
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java
servicemix/components/engines/servicemix-wsn2005/trunk/src/main/java/org/apache/servicemix/wsn/component/WSNComponent.java
servicemix/components/shared-libraries/trunk/pom.xml
servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseComponent.java
servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceMixComponent.java
servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/endpoints/PollingEndpoint.java
Modified:
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsJcaConsumerEndpoint.java
URL:
http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsJcaConsumerEndpoint.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
---
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsJcaConsumerEndpoint.java
(original)
+++
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsJcaConsumerEndpoint.java
Tue Nov 2 11:47:41 2010
@@ -16,26 +16,22 @@
*/
package org.apache.servicemix.jms.endpoints;
-import java.util.Timer;
+import org.apache.servicemix.executors.Executor;
+import org.apache.servicemix.executors.WorkManagerWrapper;
+import org.apache.servicemix.jms.JmsEndpointType;
+import org.jencks.SingletonEndpointFactory;
+import
org.springframework.jms.listener.adapter.ListenerExecutionFailedException;
import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.MessageExchange;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
-import javax.resource.spi.ActivationSpec;
-import javax.resource.spi.BootstrapContext;
-import javax.resource.spi.ResourceAdapter;
-import javax.resource.spi.UnavailableException;
-import javax.resource.spi.XATerminator;
+import javax.resource.spi.*;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.WorkManager;
import javax.transaction.TransactionManager;
-
-import org.apache.servicemix.executors.Executor;
-import org.apache.servicemix.executors.WorkManagerWrapper;
-import org.apache.servicemix.jms.JmsEndpointType;
-import org.jencks.SingletonEndpointFactory;
-import
org.springframework.jms.listener.adapter.ListenerExecutionFailedException;
+import java.util.Timer;
/**
* A Spring-based JMS consumer that uses JCA to connect to the JMS provider
@@ -107,7 +103,7 @@ public class JmsJcaConsumerEndpoint exte
public synchronized void start() throws Exception {
if (bootstrapContext == null) {
- Executor executor = getServiceUnit().getComponent().getExecutor();
+ Executor executor =
getServiceUnit().getComponent().getExecutor(MessageExchange.Role.CONSUMER);
WorkManager wm = new WorkManagerWrapper(executor);
bootstrapContext = new SimpleBootstrapContext(wm);
}
Modified:
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java
URL:
http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
---
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java
(original)
+++
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java
Tue Nov 2 11:47:41 2010
@@ -16,23 +16,18 @@
*/
package org.apache.servicemix.jms.multiplexing;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import org.apache.servicemix.jms.AbstractJmsProcessor;
+import org.apache.servicemix.jms.JmsEndpoint;
+import org.apache.servicemix.soap.Context;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
+import javax.jms.*;
import javax.naming.InitialContext;
-
-import org.apache.servicemix.jms.AbstractJmsProcessor;
-import org.apache.servicemix.jms.JmsEndpoint;
-import org.apache.servicemix.soap.Context;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.lang.IllegalStateException;
public class MultiplexingConsumerProcessor extends AbstractJmsProcessor
implements MessageListener {
@@ -85,7 +80,7 @@ public class MultiplexingConsumerProcess
if (log.isDebugEnabled()) {
log.debug("Received jms message " + message);
}
- endpoint.getServiceUnit().getComponent().getExecutor().execute(new
Runnable() {
+
endpoint.getServiceUnit().getComponent().getExecutor(MessageExchange.Role.CONSUMER).execute(new
Runnable() {
public void run() {
try {
if (log.isDebugEnabled()) {
Modified:
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
URL:
http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
---
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
(original)
+++
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
Tue Nov 2 11:47:41 2010
@@ -16,28 +16,16 @@
*/
package org.apache.servicemix.jms.multiplexing;
-import java.io.IOException;
-
-import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.Fault;
-import javax.jbi.messaging.InOnly;
-import javax.jbi.messaging.InOut;
-import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.NormalizedMessage;
-import javax.jbi.messaging.RobustInOnly;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.naming.InitialContext;
-
import org.apache.servicemix.jms.AbstractJmsProcessor;
import org.apache.servicemix.jms.JmsEndpoint;
import org.apache.servicemix.soap.SoapFault;
import org.apache.servicemix.soap.marshalers.SoapMessage;
+import javax.jbi.messaging.*;
+import javax.jms.*;
+import javax.naming.InitialContext;
+import java.lang.IllegalStateException;
+
public class MultiplexingProviderProcessor extends AbstractJmsProcessor
implements MessageListener {
@@ -77,7 +65,7 @@ public class MultiplexingProviderProcess
if (log.isDebugEnabled()) {
log.debug("Received jms message " + message);
}
- endpoint.getServiceUnit().getComponent().getExecutor().execute(new
Runnable() {
+
endpoint.getServiceUnit().getComponent().getExecutor(MessageExchange.Role.PROVIDER).execute(new
Runnable() {
public void run() {
InOut exchange = null;
if (log.isDebugEnabled()) {
Modified:
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java
URL:
http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
---
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java
(original)
+++
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/standard/StandardConsumerProcessor.java
Tue Nov 2 11:47:41 2010
@@ -16,20 +16,17 @@
*/
package org.apache.servicemix.jms.standard;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.MessageExchange;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.naming.InitialContext;
-
import org.apache.servicemix.jms.AbstractJmsProcessor;
import org.apache.servicemix.jms.JmsEndpoint;
import org.apache.servicemix.soap.Context;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jms.*;
+import javax.naming.InitialContext;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.lang.IllegalStateException;
+
public class StandardConsumerProcessor extends AbstractJmsProcessor {
protected Session session;
@@ -53,7 +50,7 @@ public class StandardConsumerProcessor e
protected void doStart() throws Exception {
synchronized (running) {
- endpoint.getServiceUnit().getComponent().getExecutor().execute(new
Runnable() {
+
endpoint.getServiceUnit().getComponent().getExecutor(MessageExchange.Role.CONSUMER).execute(new
Runnable() {
public void run() {
StandardConsumerProcessor.this.poll();
}
Modified:
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java
URL:
http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
---
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java
(original)
+++
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java
Tue Nov 2 11:47:41 2010
@@ -16,22 +16,16 @@
*/
package org.apache.servicemix.eip.patterns;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.servicemix.eip.support.resequence.*;
+import org.apache.servicemix.executors.Executor;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
-
-import org.apache.servicemix.eip.support.resequence.DefaultComparator;
-import org.apache.servicemix.eip.support.resequence.ResequencerBase;
-import org.apache.servicemix.eip.support.resequence.ResequencerEngine;
-import org.apache.servicemix.eip.support.resequence.SequenceElementComparator;
-import org.apache.servicemix.eip.support.resequence.SequenceReader;
-import org.apache.servicemix.eip.support.resequence.SequenceSender;
-import org.apache.servicemix.executors.Executor;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* This pattern implements the <a
href="http://www.enterpriseintegrationpatterns.com/Resequencer.html">Resequencer</a>
EIP
@@ -93,7 +87,7 @@ public class Resequencer extends Reseque
public void start() throws Exception {
super.start();
if (executor == null) {
- executor = getServiceUnit().getComponent().getExecutor();
+ executor =
getServiceUnit().getComponent().getExecutor(MessageExchange.Role.CONSUMER);
}
BlockingQueue<MessageExchange> queue = new
LinkedBlockingQueue<MessageExchange>();
reseq = new ResequencerEngine<MessageExchange>(comparator, capacity);
Modified:
servicemix/components/engines/servicemix-wsn2005/trunk/src/main/java/org/apache/servicemix/wsn/component/WSNComponent.java
URL:
http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-wsn2005/trunk/src/main/java/org/apache/servicemix/wsn/component/WSNComponent.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
---
servicemix/components/engines/servicemix-wsn2005/trunk/src/main/java/org/apache/servicemix/wsn/component/WSNComponent.java
(original)
+++
servicemix/components/engines/servicemix-wsn2005/trunk/src/main/java/org/apache/servicemix/wsn/component/WSNComponent.java
Tue Nov 2 11:47:41 2010
@@ -16,13 +16,18 @@
*/
package org.apache.servicemix.wsn.component;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import com.ibm.wsdl.Constants;
+import org.apache.servicemix.common.*;
+import org.apache.servicemix.common.tools.wsdl.WSDLFlattener;
+import org.apache.servicemix.wsn.EndpointManager;
+import org.apache.servicemix.wsn.EndpointRegistrationException;
+import org.apache.servicemix.wsn.jbi.JbiNotificationBroker;
+import org.apache.servicemix.wsn.jms.JmsCreatePullPoint;
+import org.springframework.core.io.Resource;
+import org.w3c.dom.Document;
+import org.xml.sax.InputSource;
+import javax.jbi.messaging.MessageExchange;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.jms.ConnectionFactory;
import javax.naming.Context;
@@ -30,28 +35,11 @@ import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.wsdl.Definition;
import javax.wsdl.factory.WSDLFactory;
-import javax.wsdl.xml.WSDLReader;
import javax.wsdl.xml.WSDLLocator;
+import javax.wsdl.xml.WSDLReader;
import javax.xml.namespace.QName;
-
-import org.w3c.dom.Document;
-
-import com.ibm.wsdl.Constants;
-
-import org.apache.servicemix.common.BaseServiceUnitManager;
-import org.apache.servicemix.common.DefaultComponent;
-import org.apache.servicemix.common.Deployer;
-import org.apache.servicemix.common.Endpoint;
-import org.apache.servicemix.common.EndpointSupport;
-import org.apache.servicemix.common.ServiceUnit;
-import org.apache.servicemix.common.DefaultServiceUnit;
-import org.apache.servicemix.common.tools.wsdl.WSDLFlattener;
-import org.apache.servicemix.wsn.EndpointManager;
-import org.apache.servicemix.wsn.EndpointRegistrationException;
-import org.apache.servicemix.wsn.jbi.JbiNotificationBroker;
-import org.apache.servicemix.wsn.jms.JmsCreatePullPoint;
-import org.springframework.core.io.Resource;
-import org.xml.sax.InputSource;
+import java.net.URL;
+import java.util.*;
public class WSNComponent extends DefaultComponent {
@@ -288,7 +276,7 @@ public class WSNComponent extends Defaul
}
public void unregister(final Object endpoint) throws
EndpointRegistrationException {
- WSNComponent.this.getExecutor().execute(new Runnable() {
+
WSNComponent.this.getExecutor(MessageExchange.Role.CONSUMER).execute(new
Runnable() {
public void run() {
try {
Endpoint ep = (Endpoint) endpoint;
Modified: servicemix/components/shared-libraries/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/servicemix/components/shared-libraries/trunk/pom.xml?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
--- servicemix/components/shared-libraries/trunk/pom.xml (original)
+++ servicemix/components/shared-libraries/trunk/pom.xml Tue Nov 2 11:47:41
2010
@@ -41,7 +41,7 @@
</scm>
<properties>
- <servicemix-version>3.3</servicemix-version>
+ <servicemix-version>3.3.3-SNAPSHOT</servicemix-version>
<activemq-version>5.4.1</activemq-version>
<servicemix.osgi.export>${servicemix.osgi.export.pkg}*;version=${project.version}</servicemix.osgi.export>
<servicemix.osgi.export.pkg />
Modified:
servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
URL:
http://svn.apache.org/viewvc/servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
---
servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
(original)
+++
servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
Tue Nov 2 11:47:41 2010
@@ -43,6 +43,7 @@ import org.apache.commons.logging.Log;
import org.apache.servicemix.executors.Executor;
import org.apache.servicemix.executors.ExecutorFactory;
import org.apache.servicemix.executors.impl.ExecutorFactoryImpl;
+import org.apache.servicemix.jbi.container.JBIContainer;
/**
* Base class for life cycle management of components. This class may be used
as
@@ -66,7 +67,9 @@ public class AsyncBaseLifeCycle implemen
protected ExecutorFactory executorFactory;
- protected Executor executor;
+ protected Executor consumerExecutor;
+
+ protected Executor providerExecutor;
protected AtomicBoolean running;
@@ -241,7 +244,8 @@ public class AsyncBaseLifeCycle implemen
if (this.executorFactory == null) {
this.executorFactory = createExecutorFactory();
}
- this.executor = this.executorFactory.createExecutor("component." +
getContext().getComponentName());
+ this.consumerExecutor =
this.executorFactory.createExecutor("component." +
getContext().getComponentName() + ".consumer");
+ this.providerExecutor =
this.executorFactory.createExecutor("component." +
getContext().getComponentName() + ".provider");
}
/*
@@ -281,8 +285,10 @@ public class AsyncBaseLifeCycle implemen
}
}
// Destroy excutor
- executor.shutdown();
- executor = null;
+ consumerExecutor.shutdown();
+ providerExecutor.shutdown();
+ consumerExecutor = null;
+ providerExecutor = null;
}
/*
@@ -312,9 +318,20 @@ public class AsyncBaseLifeCycle implemen
}
protected void doStart() throws Exception {
+ boolean doPoll = false;
+
if (container.getType() != Container.Type.ServiceMix3) {
+ doPoll = true;
+ } else {
+ Object smx3container =
((Container.Smx3Container)container).getSmx3Container();
+ if (smx3container instanceof JBIContainer) {
+ doPoll = ((JBIContainer)smx3container).isOptimizedDelivery()
== false;
+ }
+ }
+
+ if (doPoll) {
synchronized (this.polling) {
- executor.execute(new Runnable() {
+ consumerExecutor.execute(new Runnable() {
public void run() {
poller = Thread.currentThread();
pollDeliveryChannel();
@@ -330,10 +347,12 @@ public class AsyncBaseLifeCycle implemen
polling.set(true);
polling.notify();
}
+ Executor executor = null;
while (running.get()) {
try {
final MessageExchange exchange = channel.accept(1000L);
if (exchange != null) {
+ executor = exchange.getRole().equals(Role.CONSUMER) ?
consumerExecutor : providerExecutor;
final Transaction tx = (Transaction) exchange
.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
if (tx != null && container.handleTransactions()) {
@@ -413,8 +432,13 @@ public class AsyncBaseLifeCycle implemen
return context;
}
- public Executor getExecutor() {
- return executor;
+ public Executor getExecutor(Role role) {
+ if (role != null && role.equals(Role.CONSUMER)) {
+ return this.consumerExecutor;
+ } else if (role != null && role.equals(Role.PROVIDER)) {
+ return this.providerExecutor;
+ }
+ return null;
}
/**
@@ -424,8 +448,12 @@ public class AsyncBaseLifeCycle implemen
* @param executor
* @see #setExecutorFactory(ExecutorFactory)
*/
- public void setExecutor(Executor executor) {
- this.executor = executor;
+ public void setExecutor(Role role, Executor executor) {
+ if (role != null && role.equals(Role.CONSUMER)) {
+ this.consumerExecutor = executor;
+ } else {
+ this.providerExecutor = executor;
+ }
}
public ExecutorFactory getExecutorFactory() {
@@ -438,7 +466,7 @@ public class AsyncBaseLifeCycle implemen
* is deployed into ServiceMix 3.x, or a default implementation will be
used.
*
* @param executorFactory
- * @see #setExecutor(Executor)
+ * @see #setExecutor(Role, Executor)
*/
public void setExecutorFactory(ExecutorFactory executorFactory) {
this.executorFactory = executorFactory;
@@ -482,7 +510,7 @@ public class AsyncBaseLifeCycle implemen
}
}
if (oldStatus == ExchangeStatus.ACTIVE) {
- exchange.setStatus(ExchangeStatus.ERROR);
+ exchange.setStatus(ExchangeStatus.ERROR);
exchange.setError(t instanceof Exception ? (Exception) t :
new Exception(t));
channel.send(exchange);
}
@@ -523,6 +551,7 @@ public class AsyncBaseLifeCycle implemen
processExchangeInTx(exchange, tx);
return;
}
+
ExchangeStatus oldStatus = exchange.getStatus();
try {
processExchange(exchange);
Modified:
servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseComponent.java
URL:
http://svn.apache.org/viewvc/servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseComponent.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
---
servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseComponent.java
(original)
+++
servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/BaseComponent.java
Tue Nov 2 11:47:41 2010
@@ -187,8 +187,8 @@ public abstract class BaseComponent impl
*
* @return the executor for this component
*/
- public Executor getExecutor() {
- return lifeCycle.getExecutor();
+ public Executor getExecutor(Role role) {
+ return lifeCycle.getExecutor(role);
}
public Object getSmx3Container() {
Modified:
servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceMixComponent.java
URL:
http://svn.apache.org/viewvc/servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceMixComponent.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
---
servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceMixComponent.java
(original)
+++
servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceMixComponent.java
Tue Nov 2 11:47:41 2010
@@ -44,9 +44,10 @@ public interface ServiceMixComponent ext
public Registry getRegistry();
/**
+ * @param role the role to use
* @return Returns the executor for this component
*/
- public Executor getExecutor();
+ public Executor getExecutor(MessageExchange.Role role);
/**
* @return Returns the components context
Modified:
servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/endpoints/PollingEndpoint.java
URL:
http://svn.apache.org/viewvc/servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/endpoints/PollingEndpoint.java?rev=1030007&r1=1030006&r2=1030007&view=diff
==============================================================================
---
servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/endpoints/PollingEndpoint.java
(original)
+++
servicemix/components/shared-libraries/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/endpoints/PollingEndpoint.java
Tue Nov 2 11:47:41 2010
@@ -18,8 +18,8 @@ package org.apache.servicemix.common.end
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
-
import javax.jbi.JBIException;
+import javax.jbi.messaging.MessageExchange;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.QName;
@@ -154,7 +154,7 @@ public abstract class PollingEndpoint ex
}
if (executor == null) {
- executor = getServiceUnit().getComponent().getExecutor();
+ executor =
getServiceUnit().getComponent().getExecutor(MessageExchange.Role.CONSUMER);
}
if (schedulerTask != null) {
schedulerTask.cancel();