Author: gertv
Date: Tue Feb 17 12:46:19 2009
New Revision: 745021

URL: http://svn.apache.org/viewvc?rev=745021&view=rev
Log:
SMXCOMP-20: BeanEndpoint.requests map leaks a request when sending in-only mep 
with seda flow to a TransformBeanSupport-extended bean

Added:
    
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java
   (with props)
    
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java
   (with props)
    
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java
   (with props)
    
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java
   (with props)
    
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/support/
    
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/support/RequestTest.java
   (with props)
Modified:
    
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/pom.xml
    
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanComponent.java
    
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
    
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/Request.java
    
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java

Modified: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/pom.xml
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/pom.xml?rev=745021&r1=745020&r2=745021&view=diff
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/pom.xml
 (original)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/pom.xml
 Tue Feb 17 12:46:19 2009
@@ -71,6 +71,12 @@
       <artifactId>commons-jexl</artifactId>
       <version>1.1</version>
     </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>1.2.14</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

Modified: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanComponent.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanComponent.java?rev=745021&r1=745020&r2=745021&view=diff
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanComponent.java
 (original)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanComponent.java
 Tue Feb 17 12:46:19 2009
@@ -42,7 +42,7 @@
  * @org.apache.xbean.XBean element="component" description="Bean Component"
  */
 public class BeanComponent extends DefaultComponent implements 
ApplicationContextAware {
-
+    
     private BeanEndpoint[] endpoints;
     private String[] searchPackages;
     private ApplicationContext applicationContext;

Modified: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java?rev=745021&r1=745020&r2=745021&view=diff
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
 (original)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
 Tue Feb 17 12:46:19 2009
@@ -76,6 +76,11 @@
  * @org.apache.xbean.XBean element="endpoint"
  */
 public class BeanEndpoint extends ProviderEndpoint implements 
ApplicationContextAware {
+    
+    /**
+     * Property name for the correlation id that is being set on exchanges by 
the BeanEndpoint 
+     */
+    public static final String CORRELATION_ID = 
BeanEndpoint.class.getName().replaceAll("\\.", "_") + "_correlation";
 
     private ApplicationContext applicationContext;
     private String beanName;
@@ -217,7 +222,6 @@
     }
 
     protected void onProviderExchange(MessageExchange exchange) throws 
Exception {
-        Object corId = getCorrelation(exchange);
         Request req = getOrCreateCurrentRequest(exchange);
         currentRequest.set(req);
         synchronized (req) {
@@ -258,14 +262,16 @@
                     }
                 }
             }
-            checkEndOfRequest(req, corId);
+            checkEndOfRequest(req);
             currentRequest.set(null);
         }
     }
 
     protected Request getOrCreateCurrentRequest(MessageExchange exchange) 
throws Exception {
-        Object corId = getCorrelation(exchange);
-        Request req = requests.get(corId);
+        if (currentRequest.get() != null) {
+            return currentRequest.get();
+        }
+        Request req = getRequest(exchange);
         if (req == null) {
             Object pojo = getBean();
             if (pojo == null) {
@@ -273,32 +279,40 @@
                 injectBean(pojo);
                 ReflectionUtils.callLifecycleMethod(pojo, PostConstruct.class);
             }
-            req = new Request(pojo, exchange);
-            requests.put(corId, req);
+            req = new Request(getCorrelation(exchange), pojo, exchange);
+            requests.put(req.getCorrelationId(), req);
         }
         return req;
     }
+    
+    protected Request getRequest(MessageExchange exchange) throws 
MessagingException {
+        Object correlation = getCorrelation(exchange);
+        return correlation == null ? null : requests.get(correlation);
+    }
 
     protected void onConsumerExchange(MessageExchange exchange) throws 
Exception {
-        Object corId = exchange.getExchangeId();
-        Request req = requests.remove(corId);
+        Request req = getOrCreateCurrentRequest(exchange);
         if (req == null) {
             throw new IllegalStateException("Receiving unknown consumer 
exchange: " + exchange);
         }
         currentRequest.set(req);
-        // If the bean implements MessageExchangeListener,
-        // just call the method
-        if (req.getBean() instanceof MessageExchangeListener) {
+        
+        // if there's a holder for this exchange, act upon that
+        // else invoke the MessageExchangeListener interface
+        if (exchanges.containsKey(exchange.getExchangeId())) {
+            exchanges.remove(exchange.getExchangeId()).set(exchange);
+            evaluateCallbacks(req);
+            
+            //we should done() the consumer exchange here on behalf of the 
Destination who sent it
+            if (exchange instanceof InOut && 
ExchangeStatus.ACTIVE.equals(exchange.getStatus())) {
+                done(exchange);
+            }
+        } else if (req.getBean() instanceof MessageExchangeListener) {
             ((MessageExchangeListener) 
req.getBean()).onMessageExchange(exchange);
         } else {
-            Holder me = exchanges.get(exchange.getExchangeId());
-            if (me == null) {
-                throw new IllegalStateException("Consumer exchange not found");
-            }
-            me.set(exchange);
-            evaluateCallbacks(req);
+            throw new IllegalStateException("No known consumer exchange found 
and bean does not implement MessageExchangeListener");
         }
-        checkEndOfRequest(req, corId);
+        checkEndOfRequest(req);
         currentRequest.set(null);
     }
 
@@ -404,7 +418,7 @@
             URIResolver.configureExchange(me, 
getServiceUnit().getComponent().getComponentContext(), uri);
             MessageUtil.transferTo(message, me, "in");
             final Holder h = new Holder();
-            requests.put(me.getExchangeId(), currentRequest.get());
+            getOrCreateCurrentRequest(me).addExchange(me);
             exchanges.put(me.getExchangeId(), h);
             BeanEndpoint.this.send(me);
             return h;
@@ -412,16 +426,40 @@
             throw new RuntimeException(e);
         }
     }
+    
+    @Override
+    protected void send(MessageExchange me) throws MessagingException {
+        checkEndOfRequest(me);
+        super.send(me);
+    }
+
+    /*
+     * Checks if the request has ended with the given MessageExchange.  It 
will only perform the check on non-ACTIVE exchanges
+     */
+    private void checkEndOfRequest(MessageExchange me) throws 
MessagingException {
+        if (!ExchangeStatus.ACTIVE.equals(me.getStatus())) {
+            Request request = getRequest(me);
+            if (request != null) {
+                checkEndOfRequest(request);
+            }
+        }
+    }
 
-    protected void checkEndOfRequest(Request request, Object corId) {
-        if (request.getExchange().getStatus() != ExchangeStatus.ACTIVE) {
-            Object beanFromRequest = request.getBean();
-            if (beanFromRequest != bean) {
-                ReflectionUtils.callLifecycleMethod(beanFromRequest, 
PreDestroy.class);
-            }
-            //request.setBean(null);
-            //request.setExchange(null);
-            requests.remove(corId);
+    /**
+     * Checks if the request has ended.  If the request has ended, 
+     * <ul>
+     * <li>the request object is being removed from the list of pending 
requests</li> 
+     * <li>if the bean was created for that request, it is now being 
destroyed</li>
+     * </ul>
+     * 
+     * @param req the Request instance to check
+     */
+    protected void checkEndOfRequest(Request req) {
+        if (req.isFinished()) {
+            requests.remove(req.getCorrelationId());
+            if (req.getBean() != bean) {
+                ReflectionUtils.callLifecycleMethod(req.getBean(), 
PreDestroy.class);
+            }
         }
     }
 
@@ -443,6 +481,9 @@
                 correlationExpression = new 
org.apache.servicemix.expression.Expression() {
                     public Object evaluate(MessageExchange exchange, 
NormalizedMessage message) 
                         throws MessagingException {
+                        if (exchange.getProperty(CORRELATION_ID) != null) {
+                            return exchange.getProperty(CORRELATION_ID);
+                        }
                         return exchange.getExchangeId();
                     }
                 };
@@ -574,6 +615,7 @@
 
         public void send(MessageExchange messageExchange) throws 
MessagingException {
             try {
+                Request request = getOrCreateCurrentRequest(messageExchange);
                 if (messageExchange.getRole() == MessageExchange.Role.CONSUMER
                         && messageExchange.getStatus() == 
ExchangeStatus.ACTIVE) {
                     Request req = getOrCreateCurrentRequest(messageExchange);
@@ -581,6 +623,10 @@
                         throw new IllegalStateException("A bean acting as a 
consumer and using the channel "
                             + "to send exchanges must implement the 
MessageExchangeListener interface");
                     }
+                    req.addExchange(messageExchange);
+                }
+                if (messageExchange.getStatus() != ExchangeStatus.ACTIVE) {
+                    checkEndOfRequest(request);
                 }
                 getChannel().send(messageExchange);
             } catch (MessagingException e) {
@@ -591,12 +637,13 @@
         }
 
         public boolean sendSync(MessageExchange messageExchange) throws 
MessagingException {
+            checkEndOfRequest(messageExchange);
             return getChannel().sendSync(messageExchange);
         }
 
         public boolean sendSync(MessageExchange messageExchange, long l) 
throws MessagingException {
+            checkEndOfRequest(messageExchange);
             return getChannel().sendSync(messageExchange, l);
         }
-
     }
 }

Modified: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/Request.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/Request.java?rev=745021&r1=745020&r2=745021&view=diff
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/Request.java
 (original)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/Request.java
 Tue Feb 17 12:46:19 2009
@@ -17,27 +17,32 @@
 package org.apache.servicemix.bean.support;
 
 import java.lang.reflect.Method;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.MessageExchange;
 
+import org.apache.servicemix.bean.BeanEndpoint;
+
 public class Request {
     private Object bean;
-    private MessageExchange exchange;
-    private Set<String> sentExchanges;
     // Keep track of callbacks already called, so that the same callback
     // can not be called twice
     private Map<Method, Boolean> callbacks;
+    private Object correlationId;
+    private final Set<MessageExchange> exchanges = new 
HashSet<MessageExchange>();
     
     public Request() {
     }
     
-    public Request(Object bean, MessageExchange exchange) {
+    public Request(Object correlationId, Object bean, MessageExchange 
exchange) {
+        this.correlationId = correlationId;
         this.bean = bean;
-        this.exchange = exchange;
+        exchanges.add(exchange);
     }
     
     /**
@@ -53,26 +58,9 @@
     public void setBean(Object bean) {
         this.bean = bean;
     }
-    /**
-     * @return the exchange
-     */
-    public MessageExchange getExchange() {
-        return exchange;
-    }
-    /**
-     * @param exchange the exchange to set
-     */
-    public void setExchange(MessageExchange exchange) {
-        this.exchange = exchange;
-    }
-    /**
-     * @param id the id of the exchange sent 
-     */
-    public void addSentExchange(String id) {
-        if (sentExchanges == null) {
-            sentExchanges = new HashSet<String>();
-        }
-        sentExchanges.add(id);
+    
+    public Object getCorrelationId() {
+        return correlationId;
     }
 
     /**
@@ -85,4 +73,35 @@
         return callbacks;
     }
 
+    /**
+     * Check if this request is completely finished.  
+     *  
+     * @return <code>true</code> if both the Exchange is DONE and there are no 
more outstanding sent exchanges
+     */
+    public boolean isFinished() {
+        for (MessageExchange exchange : exchanges) {
+            if (ExchangeStatus.ACTIVE.equals(exchange.getStatus())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Add an exchange to this request.  All exchanges that are added to the 
request have to be finished 
+     * @param exchange
+     */
+    public void addExchange(MessageExchange exchange) {
+        exchanges.add(exchange);
+        exchange.setProperty(BeanEndpoint.CORRELATION_ID, correlationId);
+    }
+    
+    /**
+     * Get all the MessageExchanges that are involved in this request
+     * 
+     * @return an unmodifiable list of {...@link MessageExchange}s
+     */
+    public Set<MessageExchange> getExchanges() {
+        return Collections.unmodifiableSet(exchanges);
+    }
 }

Added: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java?rev=745021&view=auto
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java
 (added)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java
 Tue Feb 17 12:46:19 2009
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.tck.ExchangeCompletedListener;
+
+public abstract class AbstractBeanComponentTest extends TestCase {
+    
+    protected DefaultServiceMixClient client;
+    protected JBIContainer container;
+    protected ExchangeCompletedListener listener;
+    protected BeanComponent component;
+
+    protected void setUp() throws Exception {
+        container = new JBIContainer();
+        container.setEmbedded(true);
+        container.setUseMBeanServer(false);
+        container.setCreateMBeanServer(false);
+        configureContainer();
+        listener = new ExchangeCompletedListener();
+        container.addListener(listener);
+        
+        container.init();
+        container.start();
+
+        component = new BeanComponent();
+        container.activateComponent(component, "servicemix-bean");
+        
+        client = new DefaultServiceMixClient(container);
+    }
+
+    protected void tearDown() throws Exception {
+        listener.assertExchangeCompleted();
+        container.shutDown();
+    }
+
+    protected abstract void configureContainer();
+    
+    @SuppressWarnings("unchecked")
+    protected void assertBeanEndpointRequestsMapEmpty(BeanEndpoint 
beanEndpoint) throws Exception {
+        Field requestsMapField = 
BeanEndpoint.class.getDeclaredField("requests");
+        requestsMapField.setAccessible(true);
+        Map requestsMap = (Map) requestsMapField.get(beanEndpoint);
+        if (requestsMap.size() > 0) {
+            Thread.sleep(1000);
+        }
+        assertEquals("There should be no more pending requests on " + 
beanEndpoint, 0, requestsMap.size());
+    }
+}

Propchange: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/AbstractBeanComponentTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java?rev=745021&view=auto
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java
 (added)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java
 Tue Feb 17 12:46:19 2009
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+public class BeanEndpointInOptionalOutSedaTest extends 
BeanEndpointInOptionalOutTest {
+    
+    protected void configureContainer() {
+        container.setFlowName("seda");
+    }    
+}

Propchange: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutSedaTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java?rev=745021&view=auto
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java
 (added)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java
 Tue Feb 17 12:46:19 2009
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+import javax.annotation.Resource;
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.InOptionalOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.messaging.MessagingException;
+import javax.xml.namespace.QName;
+import javax.xml.transform.Source;
+
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.listener.MessageExchangeListener;
+
+/**
+ * A set of tests for checking InOptionalOut exchange handling by a bean 
endpoint
+ */
+public class BeanEndpointInOptionalOutTest extends AbstractBeanComponentTest {
+    
+    private static final QName IN_OPTIONAL_OUT_PRODUCER = new 
QName("urn:test", "ioo-producer");
+    private static final QName IN_OPTIONAL_OUT_CONSUMER = new 
QName("urn:test", "ioo-consumer");
+
+    protected void configureContainer() {
+        container.setFlowName("st");
+    }
+    
+    //we first have a set of tests that send an InOptionalOut exchange to the 
bean endpoint
+    public void testInOptionalOutWithBeanType() throws Exception {
+        BeanEndpoint endpoint = createBeanEndpoint(MyInOptionalOutBean.class, 
IN_OPTIONAL_OUT_PRODUCER);
+        component.addEndpoint(endpoint);
+        
+        MessageExchange io = client.createInOptionalOutExchange();
+        io.setService(IN_OPTIONAL_OUT_PRODUCER);
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(endpoint);        
+    }
+    
+    public void testInOptionalOutReturnsOut() throws Exception {
+        MyInOptionalOutBean bean = new MyInOptionalOutBean();
+        bean.response = new StringSource("<goodbye/>");
+        BeanEndpoint endpoint = createBeanEndpoint(bean, 
IN_OPTIONAL_OUT_PRODUCER);
+        component.addEndpoint(endpoint);
+        
+        MessageExchange io = client.createInOptionalOutExchange();
+        io.setService(IN_OPTIONAL_OUT_PRODUCER);
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
+        client.done(io);
+        assertBeanEndpointRequestsMapEmpty(endpoint);        
+    }
+
+    public void testInOptionalOutReturnsFault() throws Exception {
+        MyInOptionalOutBean bean = new MyInOptionalOutBean();
+        bean.fault = new StringSource("<failed_at_provider/>");
+        BeanEndpoint endpoint = createBeanEndpoint(bean, 
IN_OPTIONAL_OUT_PRODUCER);
+        component.addEndpoint(endpoint);
+        
+        MessageExchange io = client.createInOptionalOutExchange();
+        io.setService(IN_OPTIONAL_OUT_PRODUCER);
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
+        client.done(io);
+        assertBeanEndpointRequestsMapEmpty(endpoint);        
+    }
+
+    public void testInOptionalOutClientFault() throws Exception {
+        MyInOptionalOutBean bean = new MyInOptionalOutBean();
+        bean.response = new StringSource("<goodbye/>");
+        BeanEndpoint endpoint = createBeanEndpoint(bean, 
IN_OPTIONAL_OUT_PRODUCER);
+        component.addEndpoint(endpoint);
+        
+        MessageExchange io = client.createInOptionalOutExchange();
+        io.setService(IN_OPTIONAL_OUT_PRODUCER);
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
+        Fault fault = io.createFault();
+        fault.setContent(new StringSource("<failed_at_consumer/>"));
+        client.fail(io, fault);
+        assertBeanEndpointRequestsMapEmpty(endpoint);        
+    }
+
+    // this is a set of tests where the bean endpoint also acts as consumer 
and sends InOptionalOut exchanges
+    public void testInOptionalOutConsumerDone() throws Exception {
+        BeanEndpoint provider = createBeanEndpoint(MyInOptionalOutBean.class, 
IN_OPTIONAL_OUT_PRODUCER);
+        component.addEndpoint(provider);
+        BeanEndpoint consumer = createConsumerEndpoint();
+                
+        MessageExchange io = client.createInOnlyExchange();
+        io.setService(IN_OPTIONAL_OUT_CONSUMER);
+        io.setOperation(new QName("send"));
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(provider);        
+        assertBeanEndpointRequestsMapEmpty(consumer);
+    }
+    
+    public void testConsumerInOptionalOutProviderReturnsOut() throws Exception 
{
+        MyInOptionalOutBean bean = new MyInOptionalOutBean();
+        bean.response = new StringSource("<goodbye/>");
+        BeanEndpoint provider = createBeanEndpoint(bean, 
IN_OPTIONAL_OUT_PRODUCER);
+        component.addEndpoint(provider);
+        BeanEndpoint consumer = createConsumerEndpoint();
+                
+        MessageExchange io = client.createInOnlyExchange();
+        io.setService(IN_OPTIONAL_OUT_CONSUMER);
+        io.setOperation(new QName("send"));
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(provider);        
+        assertBeanEndpointRequestsMapEmpty(consumer);
+    }
+    
+    public void testConsumerInOptionalOutProviderReturnsFault() throws 
Exception {
+        MyInOptionalOutBean bean = new MyInOptionalOutBean();
+        bean.fault = new StringSource("<fault_at_provider/>");
+        BeanEndpoint provider = createBeanEndpoint(bean, 
IN_OPTIONAL_OUT_PRODUCER);
+        component.addEndpoint(provider);
+        BeanEndpoint consumer = createConsumerEndpoint();
+                
+        MessageExchange io = client.createInOnlyExchange();
+        io.setService(IN_OPTIONAL_OUT_CONSUMER);
+        io.setOperation(new QName("send"));
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(provider);        
+        assertBeanEndpointRequestsMapEmpty(consumer);
+    }
+    
+    public void testConsumerInOptionalOutConsumerReturnsFault() throws 
Exception {
+        MyInOptionalOutBean bean = new MyInOptionalOutBean();
+        bean.response = new StringSource("<goodbye/>");
+        BeanEndpoint provider = createBeanEndpoint(bean, 
IN_OPTIONAL_OUT_PRODUCER);
+        component.addEndpoint(provider);
+        BeanEndpoint consumer = createConsumerEndpoint();
+                
+        MessageExchange io = client.createInOnlyExchange();
+        io.setService(IN_OPTIONAL_OUT_CONSUMER);
+        io.setOperation(new QName("sendAndFault"));
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(provider);        
+        assertBeanEndpointRequestsMapEmpty(consumer);
+    }
+    
+    private BeanEndpoint createConsumerEndpoint() throws Exception {
+        MyConsumerBean bean = new MyConsumerBean();
+        bean.target = IN_OPTIONAL_OUT_PRODUCER;
+        BeanEndpoint endpoint = new BeanEndpoint();
+        endpoint.setBean(bean);
+        endpoint.setService(IN_OPTIONAL_OUT_CONSUMER);
+        endpoint.setEndpoint("endpoint");
+        component.addEndpoint(endpoint);
+        return endpoint;
+    }
+    
+    private BeanEndpoint createBeanEndpoint(Object bean, QName service) {
+        BeanEndpoint transformEndpoint = new BeanEndpoint();
+        transformEndpoint.setBean(bean);
+        transformEndpoint.setService(service);
+        transformEndpoint.setEndpoint("endpoint");
+        return transformEndpoint;
+    }
+      
+    private BeanEndpoint createBeanEndpoint(Class<?> type, QName service) {
+        BeanEndpoint endpoint = new BeanEndpoint();
+        endpoint.setBeanType(type);
+        endpoint.setService(service);
+        endpoint.setEndpoint("endpoint");
+        return endpoint;
+    }
+    
+    public static final class MyInOptionalOutBean implements 
MessageExchangeListener {
+        
+        private Source fault;
+        private Source response;
+        
+        @Resource
+        private DeliveryChannel channel;
+
+        public void onMessageExchange(MessageExchange exchange) throws 
MessagingException {
+            if (exchange instanceof InOptionalOut) {
+                onInOptionalOut((InOptionalOut) exchange);
+            } else {
+                exchange.setError(new Exception("Only InOptionalOut supported 
here"));
+            }
+        }
+
+        private void onInOptionalOut(InOptionalOut exchange) throws 
MessagingException {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                if (response != null) {
+                    exchange.setOutMessage(exchange.createMessage());
+                    exchange.getOutMessage().setContent(response);
+                    response = null;
+                } else if (fault != null) {
+                    exchange.setFault(exchange.createFault());
+                    exchange.getFault().setContent(fault);
+                    fault = null;
+                } else {
+                    exchange.setStatus(ExchangeStatus.DONE);
+                }
+                channel.send(exchange);
+            }
+        }
+    }
+    
+    public static final class MyConsumerBean implements 
MessageExchangeListener {
+        
+        @Resource
+        private DeliveryChannel channel;
+        private QName target;
+        private MessageExchange original;
+        private Source fault;
+                
+        public void send() throws MessagingException {
+            InOptionalOut ioo = 
channel.createExchangeFactory().createInOptionalOutExchange();
+            ioo.setService(target);
+            ioo.setInMessage(ioo.createMessage());
+            ioo.getMessage("in").setContent(new StringSource("<hello/>"));
+            channel.send(ioo);
+        }
+
+        public void onMessageExchange(MessageExchange exchange) throws 
MessagingException {
+            if (exchange.getRole() == Role.PROVIDER) {
+                original = exchange;
+                if (exchange.getOperation().equals(new QName("sendAndFault"))) 
{
+                    fault = new StringSource("<faulted_by_consumer/>");
+                }
+                send();
+            } else {                
+                if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                    if (fault != null) {
+                        exchange.setFault(exchange.createFault());
+                        exchange.getFault().setContent(fault);
+                        fault = null;
+                    } else {
+                        exchange.setStatus(ExchangeStatus.DONE);
+                        done();
+                    }
+                    channel.send(exchange);
+                } else {
+                    done();
+                }
+            }
+        }
+
+        private void done() throws MessagingException {
+            original.setStatus(ExchangeStatus.DONE);
+            channel.send(original);
+        }
+    }
+}

Propchange: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/BeanEndpointInOptionalOutTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java?rev=745021&view=auto
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java
 (added)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java
 Tue Feb 17 12:46:19 2009
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+
+public class TransformBeanSupportSedaFlowTest extends TransformBeanSupportTest 
{
+
+    protected void configureContainer() {
+        container.setFlowName("seda");
+    }
+    
+}

Propchange: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/TransformBeanSupportSedaFlowTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java?rev=745021&r1=745020&r2=745021&view=diff
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java
 (original)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/TransformBeanSupportTest.java
 Tue Feb 17 12:46:19 2009
@@ -16,58 +16,36 @@
  */
 package org.apache.servicemix.bean;
 
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.annotation.Resource;
+import javax.jbi.messaging.DeliveryChannel;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.xml.namespace.QName;
 
-import junit.framework.TestCase;
+import org.w3c.dom.Element;
 
 import org.apache.servicemix.MessageExchangeListener;
 import org.apache.servicemix.bean.support.ExchangeTarget;
 import org.apache.servicemix.bean.support.TransformBeanSupport;
-import org.apache.servicemix.client.DefaultServiceMixClient;
 import org.apache.servicemix.components.util.ComponentSupport;
-import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.components.util.EchoComponent;
+import org.apache.servicemix.expression.JAXPXPathExpression;
+import org.apache.servicemix.jbi.container.ActivationSpec;
 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
 import org.apache.servicemix.jbi.jaxp.StringSource;
 import org.apache.servicemix.jbi.util.MessageUtil;
-import org.apache.servicemix.tck.ExchangeCompletedListener;
 import org.apache.servicemix.tck.ReceiverComponent;
 
-public class TransformBeanSupportTest extends TestCase {
-
-    protected DefaultServiceMixClient client;
-    protected JBIContainer container;
-    protected ExchangeCompletedListener listener;
-    protected BeanComponent component;
-
-    protected void setUp() throws Exception {
-        container = new JBIContainer();
-        container.setEmbedded(true);
-        container.setUseMBeanServer(false);
-        container.setCreateMBeanServer(false);
-        configureContainer();
-        listener = new ExchangeCompletedListener();
-        container.addListener(listener);
-        
-        container.init();
-        container.start();
-
-        component = new BeanComponent();
-        container.activateComponent(component, "servicemix-bean");
-        
-        client = new DefaultServiceMixClient(container);
-    }
-
-    protected void tearDown() throws Exception {
-        listener.assertExchangeCompleted();
-        container.shutDown();
-    }
-
-    protected void configureContainer() throws Exception {
+public class TransformBeanSupportTest extends AbstractBeanComponentTest {
+    
+    protected void configureContainer() {
         container.setFlowName("st");
     }
     
@@ -83,11 +61,34 @@
         
         io = client.receive();
         assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
-        assertEquals("<hello/>", new 
SourceTransformer().contentToString(io.getMessage("out")));
+        Element e = new SourceTransformer().toDOMElement(io.getMessage("out"));
+        assertEquals("hello", e.getNodeName());
+        
+        client.fail(io, new Exception("We failed to handle the reponse"));
+        assertEquals(ExchangeStatus.ERROR, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(transformEndpoint);
+    }
+    
+    public void testInOutWithBeanType() throws Exception {
+        BeanEndpoint endpoint = 
createBeanEndpoint(AssertSameInstancePojo.class);
+        component.addEndpoint(endpoint);
+        
+        MessageExchange io = client.createInOutExchange();
+        io.setService(new QName("transform"));
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
+        Element e = new SourceTransformer().toDOMElement(io.getMessage("out"));
+        assertEquals("hello", e.getNodeName());
         
         client.done(io);
         assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(endpoint);        
     }
+    
+    
 
     public void testInOnly() throws Exception {
         TransformBeanSupport transformer = createTransformer("receiver");
@@ -104,6 +105,28 @@
         
         io = client.receive();
         assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(transformEndpoint);
+        
+        receiver.getMessageList().assertMessagesReceived(1);
+    }
+    
+    public void testInOnlyWithCorrelation() throws Exception {
+        TransformBeanSupport transformer = createTransformer("receiver");
+        BeanEndpoint transformEndpoint = createBeanEndpoint(transformer);
+        transformEndpoint.setCorrelationExpression(new 
JAXPXPathExpression("/message/@id"));
+        component.addEndpoint(transformEndpoint);
+
+        ReceiverComponent receiver = new ReceiverComponent();
+        activateComponent(receiver, "receiver");
+        
+        MessageExchange io = client.createInOnlyExchange();
+        io.setService(new QName("transform"));
+        io.getMessage("in").setContent(new StringSource("<message id='1'/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(transformEndpoint);
         
         receiver.getMessageList().assertMessagesReceived(1);
     }
@@ -122,6 +145,26 @@
         
         io = client.receive();
         assertEquals(ExchangeStatus.ERROR, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(transformEndpoint);
+    }
+    
+    public void testInOnlyWithDestination() throws Exception {
+        BeanEndpoint endpoint = 
createBeanEndpoint(MyDestinationTransformer.class);
+        component.addEndpoint(endpoint);
+
+        ActivationSpec spec = new ActivationSpec(new EchoComponent());
+        spec.setService(new QName("test", "receiver"));
+        spec.setComponentName("receiver");
+        container.activateComponent(spec);
+        
+        MessageExchange io = client.createInOnlyExchange();
+        io.setService(new QName("transform"));
+        io.getMessage("in").setContent(new StringSource("<hello/>"));
+        client.send(io);
+        
+        io = client.receive();
+        assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(endpoint);
     }
 
     public void testRobustInOnly() throws Exception {
@@ -139,6 +182,7 @@
         
         io = client.receive();
         assertEquals(ExchangeStatus.DONE, io.getStatus());
+        assertBeanEndpointRequestsMapEmpty(transformEndpoint);
         
         receiver.getMessageList().assertMessagesReceived(1);
     }
@@ -159,6 +203,7 @@
         assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
         assertNotNull(io.getFault());
         client.done(io);
+        assertBeanEndpointRequestsMapEmpty(transformEndpoint);
     }
 
     public void testRobustInOnlyWithFaultAndError() throws Exception {
@@ -177,6 +222,7 @@
         assertEquals(ExchangeStatus.ACTIVE, io.getStatus());
         assertNotNull(io.getFault());
         client.fail(io, new Exception("I do not like faults"));
+        assertBeanEndpointRequestsMapEmpty(transformEndpoint);
     }
 
     private MyTransformer createTransformer(String targetService) {
@@ -195,6 +241,14 @@
         return transformEndpoint;
     }
     
+    private BeanEndpoint createBeanEndpoint(Class<?> type) {
+        BeanEndpoint endpoint = new BeanEndpoint();
+        endpoint.setBeanType(type);
+        endpoint.setService(new QName("transform"));
+        endpoint.setEndpoint("endpoint");
+        return endpoint;
+    }
+    
     protected void activateComponent(ComponentSupport comp, String name) 
throws Exception {
         comp.setService(new QName(name));
         comp.setEndpoint("endpoint");
@@ -207,8 +261,8 @@
             return true;
         }
     }
-
-    public static class ReturnErrorComponent extends ComponentSupport 
implements MessageExchangeListener {
+    
+    public static class ReturnErrorComponent extends ComponentSupport 
implements org.apache.servicemix.MessageExchangeListener {
 
         public void onMessageExchange(MessageExchange exchange) throws 
MessagingException {
             if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
@@ -217,7 +271,7 @@
         }
     }
 
-    public static class ReturnFaultComponent extends ComponentSupport 
implements MessageExchangeListener {
+    public static class ReturnFaultComponent extends ComponentSupport 
implements org.apache.servicemix.MessageExchangeListener {
         
         public void onMessageExchange(MessageExchange exchange) throws 
MessagingException {
             if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
@@ -228,4 +282,57 @@
         }
     }
     
+    public static class AssertSameInstancePojo implements 
MessageExchangeListener {
+        
+        @Resource 
+        private DeliveryChannel channel;
+        
+        private String id;
+
+        public void onMessageExchange(MessageExchange exchange) throws 
MessagingException {
+            assertId(exchange);
+            if (ExchangeStatus.ACTIVE.equals(exchange.getStatus())) {
+                
MessageUtil.enableContentRereadability(exchange.getMessage("in"));
+                MessageUtil.transferInToOut(exchange, exchange);
+                channel.send(exchange);
+            }
+        }
+
+        private void assertId(MessageExchange exchange) {
+            if (exchange.getStatus().equals(ExchangeStatus.ACTIVE)) {
+                id = exchange.getExchangeId();
+            } else {
+                // make sure that the same object is being used to handle the 
Exchange with status DONE 
+                assertEquals(id, exchange.getExchangeId());
+            }
+        }        
+    }
+    
+    public static class MyDestinationTransformer implements 
MessageExchangeListener {
+        
+        @org.apache.servicemix.bean.ExchangeTarget(uri = 
"service:test:receiver")
+        private Destination receiver;
+        
+        @Resource
+        private DeliveryChannel channel;
+        
+        public void onMessageExchange(MessageExchange exchange) throws 
MessagingException {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE && exchange 
instanceof InOnly) {
+                NormalizedMessage forward = receiver.createMessage();
+                forward.setContent(exchange.getMessage("in").getContent());
+                Future<NormalizedMessage> response = receiver.send(forward);
+                //let's wait for the response to come back
+                try {
+                    response.get();
+                    exchange.setStatus(ExchangeStatus.DONE);
+                } catch (InterruptedException e) {
+                    exchange.setError(e);
+                } catch (ExecutionException e) {
+                    exchange.setError(e);
+                } finally {
+                    channel.send(exchange);
+                }
+            }
+        }
+    }
 }

Added: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/support/RequestTest.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/support/RequestTest.java?rev=745021&view=auto
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/support/RequestTest.java
 (added)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/support/RequestTest.java
 Tue Feb 17 12:46:19 2009
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean.support;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.bean.BeanEndpoint;
+import org.apache.servicemix.tck.mock.MockMessageExchange;
+
+/**
+ * Test cases for {...@link Request}
+ */
+public class RequestTest extends TestCase {
+    
+    public void testIsFinishedOnStatus() throws Exception {
+        MessageExchange exchange = createMockExchange("my-exchange-id");
+        Request request = new Request("my-correlation-id", new Object(), 
exchange);
+        assertFalse(request.isFinished());
+        exchange.setStatus(ExchangeStatus.DONE);
+        assertTrue(request.isFinished());
+    }
+    
+    public void testIsFinishedWhenAllExchangesDoneOrError() throws Exception {
+        MessageExchange exchange = createMockExchange("my-exchange-id");
+        Request request = new Request("my-correlation-id", new Object(), 
exchange);
+        assertFalse(request.isFinished());
+        
+        MessageExchange second = createMockExchange("my-second-id");
+        request.addExchange(second);
+        exchange.setStatus(ExchangeStatus.DONE);
+        assertFalse(request.isFinished());
+        
+        second.setStatus(ExchangeStatus.ERROR);
+        assertTrue(request.isFinished());
+    }
+    
+    public void testAddExchangeSetsCorrelationId() throws Exception {
+        MessageExchange exchange = createMockExchange("my-exchange-id");
+        Request request = new Request("my-correlation-id", new Object(), 
exchange);
+
+        MessageExchange second = createMockExchange("my-second-id");
+        request.addExchange(second);
+        assertEquals("my-correlation-id", 
second.getProperty(BeanEndpoint.CORRELATION_ID));
+    }
+    
+    public void testNoSentExchangeForCorrelationId() throws Exception {
+        MessageExchange exchange = createMockExchange("my-exchange-id");
+        Request request = new Request("my-correlation-id", new Object(), 
exchange);
+        request.addExchange(exchange);
+        assertEquals("We shouldn't have duplicate MessageExchange instances", 
1, request.getExchanges().size());
+    }
+    
+    private MessageExchange createMockExchange(String id) {
+        MockMessageExchange exchange = new MockMessageExchange();
+        exchange.setExchangeId(id);
+        exchange.setStatus(ExchangeStatus.ACTIVE);
+        return exchange;
+    }
+}

Propchange: 
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/support/RequestTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to