ThrottlingInflightRoutePolicy can deadlock
------------------------------------------

                 Key: CAMEL-4149
                 URL: https://issues.apache.org/jira/browse/CAMEL-4149
             Project: Camel
          Issue Type: Bug
          Components: camel-core
    Affects Versions: 2.7.0
            Reporter: Søren Markert


Using ThrottlingInflightRoutePolicy can deadlock a route in some situations. 
The unit test pasted in below shows one such situation.

What happens is that the bottom route processes its first exchange, then 
suspends. Since it is suspended it will not take the next exchange from the 
seda queue, and so it will never check whether it should re-enable the route.

Perhaps it will work by putting the check to re-enable the route in the 
onExchangeBegin method, if that is called even when the route is suspended?

{code}
import org.apache.camel.Exchange;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultInflightRepository;
import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
import org.apache.camel.impl.ThrottlingInflightRoutePolicy.ThrottlingScope;
import org.apache.camel.test.CamelTestSupport;

public class ThrottleTest extends CamelTestSupport {

        @Produce(uri = "direct:input")
        protected ProducerTemplate input;
        protected MockEndpoint resultEndpoint;

        @Override
        protected RouteBuilder createRouteBuilder() throws Exception {
            return new RouteBuilder() {
            public void configure() {
                resultEndpoint = new MockEndpoint("mock:result");
                resultEndpoint.setCamelContext(getContext());
                
                getContext().setInflightRepository(new 
DefaultInflightRepository() {
                        @Override
                    public void add(Exchange exchange) {
                                super.add(exchange);
                                System.out.println("                        add 
" + this.size());
                    }
                        @Override
                        public void remove(Exchange exchange) {
                                super.remove(exchange);
                                System.out.println("                     remove 
" + this.size());
                        }
                        
                });
                
                ThrottlingInflightRoutePolicy throttler = new 
ThrottlingInflightRoutePolicy();
                
                throttler.setMaxInflightExchanges(1);
                throttler.setScope(ThrottlingScope.Context);

                from("direct:input")
                        .inOnly("seda:hey", "seda:hey", "seda:hey", "seda:hey", 
"seda:hey")
                        .delay(1000)
                        .inOnly("log:inputDone");
                
                from("seda:hey")
                        .routePolicy(throttler)
                        .inOut("log:outputDone")
                        .to(resultEndpoint);
            }
        };
        }
        
        public void testThatAllExchangesAreReceived() throws Exception {
                input.sendBody("hello");
                
                resultEndpoint.expectedMessageCount(5);
                resultEndpoint.assertIsSatisfied();
        }
}
{code}

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira


Reply via email to