Author: gnodet
Date: Fri Feb 13 14:58:29 2009
New Revision: 744136

URL: http://svn.apache.org/viewvc?rev=744136&view=rev
Log:
SMX4NMR-56: fix a few problems related to external endpoints

Modified:
    
servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java
    
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/AbstractComponentContext.java
    
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java
    
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
    
servicemix/smx4/nmr/trunk/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java

Modified: 
servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java?rev=744136&r1=744135&r2=744136&view=diff
==============================================================================
--- 
servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java
 (original)
+++ 
servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java
 Fri Feb 13 14:58:29 2009
@@ -48,6 +48,7 @@
 import org.apache.servicemix.jbi.runtime.impl.MessageExchangeImpl;
 import org.apache.servicemix.jbi.runtime.impl.ServiceEndpointImpl;
 import org.apache.servicemix.jbi.runtime.impl.DeliveryChannelImpl;
+import org.apache.servicemix.jbi.runtime.impl.AbstractComponentContext;
 import org.apache.servicemix.jbi.cluster.requestor.JmsRequestor;
 import org.apache.servicemix.jbi.cluster.requestor.Transacted;
 import org.apache.servicemix.jbi.cluster.requestor.JmsRequestorListener;
@@ -522,7 +523,8 @@
         for (Endpoint ep : registry.getServices()) {
             Map<String,?> props = registry.getProperties(ep);
             // Check if this endpoint is addressable in the JBI space
-            if (props.get(Endpoint.SERVICE_NAME) != null && 
props.get(Endpoint.ENDPOINT_NAME) != null) {
+            if (props.get(Endpoint.SERVICE_NAME) != null && 
props.get(Endpoint.ENDPOINT_NAME) != null
+                    && props.get(AbstractComponentContext.INTERNAL_ENDPOINT) 
!= null) {
                 endpoints.add(new ServiceEndpointImpl(props));
             }
         }

Modified: 
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/AbstractComponentContext.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/AbstractComponentContext.java?rev=744136&r1=744135&r2=744136&view=diff
==============================================================================
--- 
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/AbstractComponentContext.java
 (original)
+++ 
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/AbstractComponentContext.java
 Fri Feb 13 14:58:29 2009
@@ -75,6 +75,7 @@
         Map<String, Object> props = new HashMap<String, Object>();
         props.put(Endpoint.SERVICE_NAME, serviceName.toString());
         props.put(Endpoint.ENDPOINT_NAME, endpointName);
+        props.put(INTERNAL_ENDPOINT, Boolean.TRUE.toString());
         List<Endpoint> endpoints = getNmr().getEndpointRegistry().query(props);
         if (endpoints.isEmpty()) {
             return null;
@@ -118,19 +119,15 @@
     }
 
     protected ServiceEndpoint[] queryInternalEndpoints(Map<String, Object> 
props) {
-        return doQueryEndpoints(props, false);
+        return doQueryServiceEndpoints(props, false);
     }
 
     protected ServiceEndpoint[] queryExternalEndpoints(Map<String, Object> 
props) {
-        return doQueryEndpoints(props, true);
+        return doQueryServiceEndpoints(props, true);
     }
 
-    protected ServiceEndpoint[] doQueryEndpoints(Map<String, Object> props, 
boolean external) {
-        if (props == null) {
-            props = new HashMap<String, Object>();
-        }
-        props.put(external ? EXTERNAL_ENDPOINT : INTERNAL_ENDPOINT, 
Boolean.TRUE.toString());
-        List<Endpoint> endpoints = getNmr().getEndpointRegistry().query(props);
+    protected ServiceEndpoint[] doQueryServiceEndpoints(Map<String, Object> 
props, boolean external) {
+        List<Endpoint> endpoints = doQueryEndpoints(props, external);
         List<ServiceEndpoint> ses = new ArrayList<ServiceEndpoint>();
         for (Endpoint endpoint : endpoints) {
             ServiceEndpoint se = getServiceEndpoint(endpoint);
@@ -141,6 +138,15 @@
         return ses.toArray(new ServiceEndpoint[ses.size()]);
     }
 
+    protected List<Endpoint> doQueryEndpoints(Map<String, Object> props, 
boolean external) {
+        if (props == null) {
+            props = new HashMap<String, Object>();
+        }
+        props.put(external ? EXTERNAL_ENDPOINT : INTERNAL_ENDPOINT, 
Boolean.TRUE.toString());
+        List<Endpoint> endpoints = getNmr().getEndpointRegistry().query(props);
+        return endpoints;
+    }
+
     protected ServiceEndpoint getServiceEndpoint(Endpoint endpoint) {
         if (endpoint instanceof InternalEndpoint) {
             endpoint = ((InternalEndpoint) endpoint).getEndpoint();

Modified: 
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java?rev=744136&r1=744135&r2=744136&view=diff
==============================================================================
--- 
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java
 (original)
+++ 
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java
 Fri Feb 13 14:58:29 2009
@@ -19,6 +19,7 @@
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
@@ -125,9 +126,15 @@
     }
 
     public synchronized void deactivateEndpoint(ServiceEndpoint endpoint) 
throws JBIException {
-        // TODO: retrieve the correct endpoint if needed
-        EndpointImpl ep = (EndpointImpl) endpoint;
-        componentRegistry.getNmr().getEndpointRegistry().unregister(ep, null);
+        List<Endpoint> eps = 
doQueryEndpoints(ServiceHelper.createMap(Endpoint.SERVICE_NAME,
+                                                                      
endpoint.getServiceName().toString(),
+                                                                      
Endpoint.ENDPOINT_NAME,
+                                                                      
endpoint.getEndpointName()),
+                                              false);
+        if (eps != null && eps.size() == 1) {
+            Endpoint ep = eps.get(0);
+            componentRegistry.getNmr().getEndpointRegistry().unregister(ep, 
null);
+        }
     }
 
     public void registerExternalEndpoint(ServiceEndpoint externalEndpoint) 
throws JBIException {
@@ -166,14 +173,13 @@
     }
 
     public void deregisterExternalEndpoint(ServiceEndpoint externalEndpoint) 
throws JBIException {
-        ServiceEndpoint[] ses = 
doQueryEndpoints(ServiceHelper.createMap(Endpoint.SERVICE_NAME,
-                                                                         
externalEndpoint.getServiceName().toString(),
-                                                                         
Endpoint.ENDPOINT_NAME,
-                                                                         
externalEndpoint.getEndpointName()),
+        List<Endpoint> eps = 
doQueryEndpoints(ServiceHelper.createMap(Endpoint.SERVICE_NAME,
+                                                                      
externalEndpoint.getServiceName().toString(),
+                                                                      
Endpoint.ENDPOINT_NAME,
+                                                                      
externalEndpoint.getEndpointName()),
                                                  true);
-        if (ses != null && ses.length == 1) {
-            // TODO: retrieve the correct endpoint if needed
-            EndpointImpl ep = (EndpointImpl) ses[0];
+        if (eps != null && eps.size() == 1) {
+            Endpoint ep = eps.get(0);
             componentRegistry.getNmr().getEndpointRegistry().unregister(ep, 
null);
         }
     }

Modified: 
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java?rev=744136&r1=744135&r2=744136&view=diff
==============================================================================
--- 
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
 (original)
+++ 
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
 Fri Feb 13 14:58:29 2009
@@ -175,9 +175,30 @@
         exchange.setProperty(SEND_SYNC, null);
         ((MessageExchangeImpl) exchange).afterSend();
         InternalExchange ie = (InternalExchange) ((MessageExchangeImpl) 
exchange).getInternalExchange();
-        // If this
+        getChannelToUse(ie).send(ie);
+    }
+
+    public boolean sendSync(MessageExchange exchange) throws 
MessagingException {
+        assert exchange != null;
+        createTarget(context.getNmr(), exchange);
+        exchange.setProperty(SEND_SYNC, Boolean.TRUE);
+        ((MessageExchangeImpl) exchange).afterSend();
+        InternalExchange ie = (InternalExchange) ((MessageExchangeImpl) 
exchange).getInternalExchange();
+        return getChannelToUse(ie).sendSync(ie);
+    }
+
+    public boolean sendSync(MessageExchange exchange, long timeout) throws 
MessagingException {
+        assert exchange != null;
+        createTarget(context.getNmr(), exchange);
+        exchange.setProperty(SEND_SYNC, Boolean.TRUE);
+        ((MessageExchangeImpl) exchange).afterSend();
+        InternalExchange ie = (InternalExchange) ((MessageExchangeImpl) 
exchange).getInternalExchange();
+        return getChannelToUse(ie).sendSync(ie, timeout);
+    }
+
+    protected Channel getChannelToUse(InternalExchange exchange) {
         Channel channelToUse = channel;
-        if (ie.getSource() == null) {
+        if (exchange.getSource() == null) {
             // We need to look up the channel corresponding to the sender 
endpoint
             try {
                 String sender = (String) exchange.getProperty(SENDER_ENDPOINT);
@@ -185,7 +206,11 @@
                     int idx = sender.lastIndexOf(':');
                     String svc = sender.substring(0, idx);
                     String ep = sender.substring(idx + 1);
-                    List<Endpoint> eps = 
channel.getNMR().getEndpointRegistry().query(ServiceHelper.createMap(Endpoint.SERVICE_NAME,
 svc, Endpoint.ENDPOINT_NAME, ep));
+                    Map<String, Object> props = 
ServiceHelper.createMap(Endpoint.SERVICE_NAME, svc,
+                                                                        
Endpoint.ENDPOINT_NAME, ep);
+                    // TODO: we may be using the wrong channel if both an 
internal endpoint and an external endpoint
+                    // have been registered with the same svc / ep name
+                    List<Endpoint> eps = 
channel.getNMR().getEndpointRegistry().query(props);
                     if (eps != null && eps.size() == 1) {
                         channelToUse = ((InternalEndpoint) 
eps.get(0)).getChannel();
                     }
@@ -194,25 +219,9 @@
                 // Ignore
             }
         } else {
-            channelToUse = ie.getSource().getChannel();
+            channelToUse = exchange.getSource().getChannel();
         }
-        channelToUse.send(ie);
-    }
-
-    public boolean sendSync(MessageExchange exchange) throws 
MessagingException {
-        assert exchange != null;
-        createTarget(context.getNmr(), exchange);
-        exchange.setProperty(SEND_SYNC, Boolean.TRUE);
-        ((MessageExchangeImpl) exchange).afterSend();
-        return channel.sendSync(((MessageExchangeImpl) 
exchange).getInternalExchange());
-    }
-
-    public boolean sendSync(MessageExchange exchange, long timeout) throws 
MessagingException {
-        assert exchange != null;
-        createTarget(context.getNmr(), exchange);
-        exchange.setProperty(SEND_SYNC, Boolean.TRUE);
-        ((MessageExchangeImpl) exchange).afterSend();
-        return channel.sendSync(((MessageExchangeImpl) 
exchange).getInternalExchange(), timeout);
+        return channelToUse;
     }
 
     public static void createTarget(NMR nmr, MessageExchange messageExchange) {
@@ -240,6 +249,7 @@
             if (props.isEmpty()) {
                 throw new IllegalStateException("No endpoint, service or 
interface name specified for routing");
             }
+            props.put(AbstractComponentContext.INTERNAL_ENDPOINT, 
Boolean.TRUE.toString());
             Reference target = nmr.getEndpointRegistry().lookup(props);
             exchange.setTarget(target);
         }

Modified: 
servicemix/smx4/nmr/trunk/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java?rev=744136&r1=744135&r2=744136&view=diff
==============================================================================
--- 
servicemix/smx4/nmr/trunk/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java
 (original)
+++ 
servicemix/smx4/nmr/trunk/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java
 Fri Feb 13 14:58:29 2009
@@ -27,6 +27,7 @@
 import org.apache.servicemix.eip.support.ExchangeTarget;
 import org.apache.servicemix.jbi.runtime.impl.ComponentContextImpl;
 import org.apache.servicemix.jbi.runtime.impl.ComponentRegistryImpl;
+import org.apache.servicemix.jbi.runtime.impl.AbstractComponentContext;
 import org.apache.servicemix.nmr.api.Channel;
 import org.apache.servicemix.nmr.api.Endpoint;
 import org.apache.servicemix.nmr.api.Exchange;
@@ -61,6 +62,7 @@
         };
         Map<String, Object> props = new HashMap<String, Object>();
         props.put(Endpoint.SERVICE_NAME, new QName("target").toString());
+        props.put(AbstractComponentContext.INTERNAL_ENDPOINT, 
Boolean.TRUE.toString());
         smx.getEndpointRegistry().register(tep, props);
 
         EIPComponent eip = new EIPComponent();


Reply via email to