Author: gnodet
Date: Fri Feb 13 14:58:29 2009
New Revision: 744136
URL: http://svn.apache.org/viewvc?rev=744136&view=rev
Log:
SMX4NMR-56: fix a few problems related to external endpoints
Modified:
servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/AbstractComponentContext.java
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
servicemix/smx4/nmr/trunk/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java
Modified:
servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java
URL:
http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java?rev=744136&r1=744135&r2=744136&view=diff
==============================================================================
---
servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java
(original)
+++
servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java
Fri Feb 13 14:58:29 2009
@@ -48,6 +48,7 @@
import org.apache.servicemix.jbi.runtime.impl.MessageExchangeImpl;
import org.apache.servicemix.jbi.runtime.impl.ServiceEndpointImpl;
import org.apache.servicemix.jbi.runtime.impl.DeliveryChannelImpl;
+import org.apache.servicemix.jbi.runtime.impl.AbstractComponentContext;
import org.apache.servicemix.jbi.cluster.requestor.JmsRequestor;
import org.apache.servicemix.jbi.cluster.requestor.Transacted;
import org.apache.servicemix.jbi.cluster.requestor.JmsRequestorListener;
@@ -522,7 +523,8 @@
for (Endpoint ep : registry.getServices()) {
Map<String,?> props = registry.getProperties(ep);
// Check if this endpoint is addressable in the JBI space
- if (props.get(Endpoint.SERVICE_NAME) != null &&
props.get(Endpoint.ENDPOINT_NAME) != null) {
+ if (props.get(Endpoint.SERVICE_NAME) != null &&
props.get(Endpoint.ENDPOINT_NAME) != null
+ && props.get(AbstractComponentContext.INTERNAL_ENDPOINT)
!= null) {
endpoints.add(new ServiceEndpointImpl(props));
}
}
Modified:
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/AbstractComponentContext.java
URL:
http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/AbstractComponentContext.java?rev=744136&r1=744135&r2=744136&view=diff
==============================================================================
---
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/AbstractComponentContext.java
(original)
+++
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/AbstractComponentContext.java
Fri Feb 13 14:58:29 2009
@@ -75,6 +75,7 @@
Map<String, Object> props = new HashMap<String, Object>();
props.put(Endpoint.SERVICE_NAME, serviceName.toString());
props.put(Endpoint.ENDPOINT_NAME, endpointName);
+ props.put(INTERNAL_ENDPOINT, Boolean.TRUE.toString());
List<Endpoint> endpoints = getNmr().getEndpointRegistry().query(props);
if (endpoints.isEmpty()) {
return null;
@@ -118,19 +119,15 @@
}
protected ServiceEndpoint[] queryInternalEndpoints(Map<String, Object>
props) {
- return doQueryEndpoints(props, false);
+ return doQueryServiceEndpoints(props, false);
}
protected ServiceEndpoint[] queryExternalEndpoints(Map<String, Object>
props) {
- return doQueryEndpoints(props, true);
+ return doQueryServiceEndpoints(props, true);
}
- protected ServiceEndpoint[] doQueryEndpoints(Map<String, Object> props,
boolean external) {
- if (props == null) {
- props = new HashMap<String, Object>();
- }
- props.put(external ? EXTERNAL_ENDPOINT : INTERNAL_ENDPOINT,
Boolean.TRUE.toString());
- List<Endpoint> endpoints = getNmr().getEndpointRegistry().query(props);
+ protected ServiceEndpoint[] doQueryServiceEndpoints(Map<String, Object>
props, boolean external) {
+ List<Endpoint> endpoints = doQueryEndpoints(props, external);
List<ServiceEndpoint> ses = new ArrayList<ServiceEndpoint>();
for (Endpoint endpoint : endpoints) {
ServiceEndpoint se = getServiceEndpoint(endpoint);
@@ -141,6 +138,15 @@
return ses.toArray(new ServiceEndpoint[ses.size()]);
}
+ protected List<Endpoint> doQueryEndpoints(Map<String, Object> props,
boolean external) {
+ if (props == null) {
+ props = new HashMap<String, Object>();
+ }
+ props.put(external ? EXTERNAL_ENDPOINT : INTERNAL_ENDPOINT,
Boolean.TRUE.toString());
+ List<Endpoint> endpoints = getNmr().getEndpointRegistry().query(props);
+ return endpoints;
+ }
+
protected ServiceEndpoint getServiceEndpoint(Endpoint endpoint) {
if (endpoint instanceof InternalEndpoint) {
endpoint = ((InternalEndpoint) endpoint).getEndpoint();
Modified:
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java
URL:
http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java?rev=744136&r1=744135&r2=744136&view=diff
==============================================================================
---
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java
(original)
+++
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/ComponentContextImpl.java
Fri Feb 13 14:58:29 2009
@@ -19,6 +19,7 @@
import java.io.File;
import java.util.HashMap;
import java.util.Map;
+import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -125,9 +126,15 @@
}
public synchronized void deactivateEndpoint(ServiceEndpoint endpoint)
throws JBIException {
- // TODO: retrieve the correct endpoint if needed
- EndpointImpl ep = (EndpointImpl) endpoint;
- componentRegistry.getNmr().getEndpointRegistry().unregister(ep, null);
+ List<Endpoint> eps =
doQueryEndpoints(ServiceHelper.createMap(Endpoint.SERVICE_NAME,
+
endpoint.getServiceName().toString(),
+
Endpoint.ENDPOINT_NAME,
+
endpoint.getEndpointName()),
+ false);
+ if (eps != null && eps.size() == 1) {
+ Endpoint ep = eps.get(0);
+ componentRegistry.getNmr().getEndpointRegistry().unregister(ep,
null);
+ }
}
public void registerExternalEndpoint(ServiceEndpoint externalEndpoint)
throws JBIException {
@@ -166,14 +173,13 @@
}
public void deregisterExternalEndpoint(ServiceEndpoint externalEndpoint)
throws JBIException {
- ServiceEndpoint[] ses =
doQueryEndpoints(ServiceHelper.createMap(Endpoint.SERVICE_NAME,
-
externalEndpoint.getServiceName().toString(),
-
Endpoint.ENDPOINT_NAME,
-
externalEndpoint.getEndpointName()),
+ List<Endpoint> eps =
doQueryEndpoints(ServiceHelper.createMap(Endpoint.SERVICE_NAME,
+
externalEndpoint.getServiceName().toString(),
+
Endpoint.ENDPOINT_NAME,
+
externalEndpoint.getEndpointName()),
true);
- if (ses != null && ses.length == 1) {
- // TODO: retrieve the correct endpoint if needed
- EndpointImpl ep = (EndpointImpl) ses[0];
+ if (eps != null && eps.size() == 1) {
+ Endpoint ep = eps.get(0);
componentRegistry.getNmr().getEndpointRegistry().unregister(ep,
null);
}
}
Modified:
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
URL:
http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java?rev=744136&r1=744135&r2=744136&view=diff
==============================================================================
---
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
(original)
+++
servicemix/smx4/nmr/trunk/jbi/runtime/src/main/java/org/apache/servicemix/jbi/runtime/impl/DeliveryChannelImpl.java
Fri Feb 13 14:58:29 2009
@@ -175,9 +175,30 @@
exchange.setProperty(SEND_SYNC, null);
((MessageExchangeImpl) exchange).afterSend();
InternalExchange ie = (InternalExchange) ((MessageExchangeImpl)
exchange).getInternalExchange();
- // If this
+ getChannelToUse(ie).send(ie);
+ }
+
+ public boolean sendSync(MessageExchange exchange) throws
MessagingException {
+ assert exchange != null;
+ createTarget(context.getNmr(), exchange);
+ exchange.setProperty(SEND_SYNC, Boolean.TRUE);
+ ((MessageExchangeImpl) exchange).afterSend();
+ InternalExchange ie = (InternalExchange) ((MessageExchangeImpl)
exchange).getInternalExchange();
+ return getChannelToUse(ie).sendSync(ie);
+ }
+
+ public boolean sendSync(MessageExchange exchange, long timeout) throws
MessagingException {
+ assert exchange != null;
+ createTarget(context.getNmr(), exchange);
+ exchange.setProperty(SEND_SYNC, Boolean.TRUE);
+ ((MessageExchangeImpl) exchange).afterSend();
+ InternalExchange ie = (InternalExchange) ((MessageExchangeImpl)
exchange).getInternalExchange();
+ return getChannelToUse(ie).sendSync(ie, timeout);
+ }
+
+ protected Channel getChannelToUse(InternalExchange exchange) {
Channel channelToUse = channel;
- if (ie.getSource() == null) {
+ if (exchange.getSource() == null) {
// We need to look up the channel corresponding to the sender
endpoint
try {
String sender = (String) exchange.getProperty(SENDER_ENDPOINT);
@@ -185,7 +206,11 @@
int idx = sender.lastIndexOf(':');
String svc = sender.substring(0, idx);
String ep = sender.substring(idx + 1);
- List<Endpoint> eps =
channel.getNMR().getEndpointRegistry().query(ServiceHelper.createMap(Endpoint.SERVICE_NAME,
svc, Endpoint.ENDPOINT_NAME, ep));
+ Map<String, Object> props =
ServiceHelper.createMap(Endpoint.SERVICE_NAME, svc,
+
Endpoint.ENDPOINT_NAME, ep);
+ // TODO: we may be using the wrong channel if both an
internal endpoint and an external endpoint
+ // have been registered with the same svc / ep name
+ List<Endpoint> eps =
channel.getNMR().getEndpointRegistry().query(props);
if (eps != null && eps.size() == 1) {
channelToUse = ((InternalEndpoint)
eps.get(0)).getChannel();
}
@@ -194,25 +219,9 @@
// Ignore
}
} else {
- channelToUse = ie.getSource().getChannel();
+ channelToUse = exchange.getSource().getChannel();
}
- channelToUse.send(ie);
- }
-
- public boolean sendSync(MessageExchange exchange) throws
MessagingException {
- assert exchange != null;
- createTarget(context.getNmr(), exchange);
- exchange.setProperty(SEND_SYNC, Boolean.TRUE);
- ((MessageExchangeImpl) exchange).afterSend();
- return channel.sendSync(((MessageExchangeImpl)
exchange).getInternalExchange());
- }
-
- public boolean sendSync(MessageExchange exchange, long timeout) throws
MessagingException {
- assert exchange != null;
- createTarget(context.getNmr(), exchange);
- exchange.setProperty(SEND_SYNC, Boolean.TRUE);
- ((MessageExchangeImpl) exchange).afterSend();
- return channel.sendSync(((MessageExchangeImpl)
exchange).getInternalExchange(), timeout);
+ return channelToUse;
}
public static void createTarget(NMR nmr, MessageExchange messageExchange) {
@@ -240,6 +249,7 @@
if (props.isEmpty()) {
throw new IllegalStateException("No endpoint, service or
interface name specified for routing");
}
+ props.put(AbstractComponentContext.INTERNAL_ENDPOINT,
Boolean.TRUE.toString());
Reference target = nmr.getEndpointRegistry().lookup(props);
exchange.setTarget(target);
}
Modified:
servicemix/smx4/nmr/trunk/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java
URL:
http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java?rev=744136&r1=744135&r2=744136&view=diff
==============================================================================
---
servicemix/smx4/nmr/trunk/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java
(original)
+++
servicemix/smx4/nmr/trunk/jbi/runtime/src/test/java/org/apache/servicemix/jbi/runtime/IntegrationTest.java
Fri Feb 13 14:58:29 2009
@@ -27,6 +27,7 @@
import org.apache.servicemix.eip.support.ExchangeTarget;
import org.apache.servicemix.jbi.runtime.impl.ComponentContextImpl;
import org.apache.servicemix.jbi.runtime.impl.ComponentRegistryImpl;
+import org.apache.servicemix.jbi.runtime.impl.AbstractComponentContext;
import org.apache.servicemix.nmr.api.Channel;
import org.apache.servicemix.nmr.api.Endpoint;
import org.apache.servicemix.nmr.api.Exchange;
@@ -61,6 +62,7 @@
};
Map<String, Object> props = new HashMap<String, Object>();
props.put(Endpoint.SERVICE_NAME, new QName("target").toString());
+ props.put(AbstractComponentContext.INTERNAL_ENDPOINT,
Boolean.TRUE.toString());
smx.getEndpointRegistry().register(tep, props);
EIPComponent eip = new EIPComponent();