Author: gnodet
Date: Wed Feb 11 16:54:25 2009
New Revision: 743383
URL: http://svn.apache.org/viewvc?rev=743383&view=rev
Log:
SMX4NMR-71: When a component sends a new exchange, the DeliveryChannel should
select the NMR channel corresponding to this endpoint instead of the one for
the component's endpoint
Modified:
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
Modified:
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
URL:
http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java?rev=743383&r1=743382&r2=743383&view=diff
==============================================================================
---
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
(original)
+++
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
Wed Feb 11 16:54:25 2009
@@ -18,6 +18,7 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,7 +37,9 @@
import org.apache.servicemix.nmr.api.Pattern;
import org.apache.servicemix.nmr.api.Reference;
import org.apache.servicemix.nmr.api.NMR;
+import org.apache.servicemix.nmr.api.service.ServiceHelper;
import org.apache.servicemix.nmr.api.internal.InternalExchange;
+import org.apache.servicemix.nmr.api.internal.InternalEndpoint;
/**
* Implementation of the DeliveryChannel.
@@ -44,7 +47,9 @@
*/
public class DeliveryChannelImpl implements DeliveryChannel {
- public static final java.lang.String SEND_SYNC =
"javax.jbi.messaging.sendSync";
+ public static final String SEND_SYNC = "javax.jbi.messaging.sendSync";
+
+ private static final String SENDER_ENDPOINT =
"org.apache.servicemix.senderEndpoint";
/** Mutable boolean indicating if the channe has been closed */
private final AtomicBoolean closed;
@@ -153,10 +158,8 @@
if (((InternalExchange) exchange).getDestination() != null &&
me.getEndpoint() == null) {
Endpoint ep = ((InternalExchange) exchange).getDestination();
Map<String, ?> props =
context.getNmr().getEndpointRegistry().getProperties(ep);
- QName serviceName = QName.valueOf((String)
props.get(Endpoint.SERVICE_NAME));
- if (serviceName == null) {
- serviceName = DEFAULT_SERVICE_NAME;
- }
+ String strSvcName = (String) props.get(Endpoint.SERVICE_NAME);
+ QName serviceName = (strSvcName != null && strSvcName.length() >
0) ? QName.valueOf(strSvcName) : DEFAULT_SERVICE_NAME;
String endpointName = (String) props.get(Endpoint.ENDPOINT_NAME);
if (endpointName == null) {
endpointName = (String) props.get(Endpoint.NAME);
@@ -171,7 +174,29 @@
createTarget(context.getNmr(), exchange);
exchange.setProperty(SEND_SYNC, null);
((MessageExchangeImpl) exchange).afterSend();
- channel.send(((MessageExchangeImpl) exchange).getInternalExchange());
+ InternalExchange ie = (InternalExchange) ((MessageExchangeImpl)
exchange).getInternalExchange();
+ // If this
+ Channel channelToUse = channel;
+ if (ie.getSource() == null) {
+ // We need to look up the channel corresponding to the sender
endpoint
+ try {
+ String sender = (String) exchange.getProperty(SENDER_ENDPOINT);
+ if (sender != null) {
+ int idx = sender.lastIndexOf(':');
+ String svc = sender.substring(0, idx);
+ String ep = sender.substring(idx + 1);
+ List<Endpoint> eps =
channel.getNMR().getEndpointRegistry().query(ServiceHelper.createMap(Endpoint.SERVICE_NAME,
svc, Endpoint.ENDPOINT_NAME, ep));
+ if (eps != null && eps.size() == 1) {
+ channelToUse = ((InternalEndpoint)
eps.get(0)).getChannel();
+ }
+ }
+ } catch (Throwable t) {
+ // Ignore
+ }
+ } else {
+ channelToUse = ie.getSource().getChannel();
+ }
+ channelToUse.send(ie);
}
public boolean sendSync(MessageExchange exchange) throws
MessagingException {