Author: gnodet
Date: Wed Feb 11 16:54:25 2009
New Revision: 743383

URL: http://svn.apache.org/viewvc?rev=743383&view=rev
Log:
SMX4NMR-71: When a component sends a new exchange, the DeliveryChannel should 
select the NMR channel corresponding to this endpoint instead of the one for 
the component's endpoint

Modified:
    
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java

Modified: 
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java?rev=743383&r1=743382&r2=743383&view=diff
==============================================================================
--- 
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
 (original)
+++ 
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
 Wed Feb 11 16:54:25 2009
@@ -18,6 +18,7 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,7 +37,9 @@
 import org.apache.servicemix.nmr.api.Pattern;
 import org.apache.servicemix.nmr.api.Reference;
 import org.apache.servicemix.nmr.api.NMR;
+import org.apache.servicemix.nmr.api.service.ServiceHelper;
 import org.apache.servicemix.nmr.api.internal.InternalExchange;
+import org.apache.servicemix.nmr.api.internal.InternalEndpoint;
 
 /**
  * Implementation of the DeliveryChannel.
@@ -44,7 +47,9 @@
  */
 public class DeliveryChannelImpl implements DeliveryChannel {
 
-    public static final java.lang.String SEND_SYNC = 
"javax.jbi.messaging.sendSync";
+    public static final String SEND_SYNC = "javax.jbi.messaging.sendSync";
+
+    private static final String SENDER_ENDPOINT = 
"org.apache.servicemix.senderEndpoint";
 
     /** Mutable boolean indicating if the channe has been closed */
     private final AtomicBoolean closed;
@@ -153,10 +158,8 @@
         if (((InternalExchange) exchange).getDestination() != null && 
me.getEndpoint() == null) {
             Endpoint ep = ((InternalExchange) exchange).getDestination();
             Map<String, ?> props = 
context.getNmr().getEndpointRegistry().getProperties(ep);
-            QName serviceName = QName.valueOf((String) 
props.get(Endpoint.SERVICE_NAME));
-            if (serviceName == null) {
-                serviceName = DEFAULT_SERVICE_NAME;
-            }
+            String strSvcName = (String) props.get(Endpoint.SERVICE_NAME);
+            QName serviceName = (strSvcName != null && strSvcName.length() > 
0) ? QName.valueOf(strSvcName) : DEFAULT_SERVICE_NAME;
             String endpointName = (String) props.get(Endpoint.ENDPOINT_NAME);
             if (endpointName == null) {
                 endpointName = (String) props.get(Endpoint.NAME);
@@ -171,7 +174,29 @@
         createTarget(context.getNmr(), exchange);
         exchange.setProperty(SEND_SYNC, null);
         ((MessageExchangeImpl) exchange).afterSend();
-        channel.send(((MessageExchangeImpl) exchange).getInternalExchange());
+        InternalExchange ie = (InternalExchange) ((MessageExchangeImpl) 
exchange).getInternalExchange();
+        // If this
+        Channel channelToUse = channel;
+        if (ie.getSource() == null) {
+            // We need to look up the channel corresponding to the sender 
endpoint
+            try {
+                String sender = (String) exchange.getProperty(SENDER_ENDPOINT);
+                if (sender != null) {
+                    int idx = sender.lastIndexOf(':');
+                    String svc = sender.substring(0, idx);
+                    String ep = sender.substring(idx + 1);
+                    List<Endpoint> eps = 
channel.getNMR().getEndpointRegistry().query(ServiceHelper.createMap(Endpoint.SERVICE_NAME,
 svc, Endpoint.ENDPOINT_NAME, ep));
+                    if (eps != null && eps.size() == 1) {
+                        channelToUse = ((InternalEndpoint) 
eps.get(0)).getChannel();
+                    }
+                }
+            } catch (Throwable t) {
+                // Ignore
+            }
+        } else {
+            channelToUse = ie.getSource().getChannel();
+        }
+        channelToUse.send(ie);
     }
 
     public boolean sendSync(MessageExchange exchange) throws 
MessagingException {


Reply via email to