[ 
https://issues.apache.org/jira/browse/CAMEL-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

moritz löser updated CAMEL-9222:
--------------------------------
    Description: 
If using onCompletion (or Routepolicy.onExchangeDone) in a route that splits 
and aggregates onCompletion is called on original exchange before any exchange 
is consumed if parallelProcessing() is set on aggregator definition.

I created a unit test with a simple route. the tests checks the order of calls 
on the mocks. the call on finish should be last (just (un)comment 
parallelProcessing to see the effect):

{code:java}
package de.ml;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.when;

import java.util.Arrays;
import java.util.UUID;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.direct.DirectEndpoint;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

@RunWith(MockitoJUnitRunner.class)
public class TestAggregator {
    private static final String SIZE = "size";
    private static final String ID = "id";
    private final CamelContext context = new DefaultCamelContext();
    @Mock
    private Processor mockProcessor;

    @Mock
    private Processor mockHeaderProcessor;

    @Mock
    private AggregationStrategy mockAggregator;

    @Mock
    private Processor onFinishMock;
    private DirectEndpoint trigger;
    private MockEndpoint out;

    @Before
    public void prepareMocks() throws Exception {
        when( mockAggregator.aggregate( any( Exchange.class ), any( 
Exchange.class ) ) ).then(
            new Answer<Exchange>() {

                @Override
                public Exchange answer( InvocationOnMock invocation ) throws 
Throwable {
                    // always just return old exchange
                    return (Exchange) invocation.getArguments()[1];
                }
            } );
        doAnswer( new Answer<Void>() {

            @Override
            public Void answer( InvocationOnMock invocation ) throws Throwable {
                Exchange exchange = (Exchange) invocation.getArguments()[0];
                exchange.getIn().setHeader( ID, UUID.randomUUID() );
                exchange.getIn().setHeader( SIZE, 1 );
                return null;
            }
        } ).when( mockHeaderProcessor ).process( any( Exchange.class ) );
    }

    @Test
    public void testRoute() throws Exception {
        trigger = context.getEndpoint( "direct:trigger", DirectEndpoint.class );
        out = context.getEndpoint( "mock:out", MockEndpoint.class );
        RoutesBuilder testObject = getTestObject();
        context.addRoutes( testObject );
        out.expectedMessageCount( 3 );
        context.start();
        context.createProducerTemplate().sendBody( trigger, Arrays.asList( 
"1a", "1b", "2c" ) );
        out.assertIsSatisfied();
        InOrder order = inOrder( mockHeaderProcessor, mockProcessor, 
onFinishMock );
        order.verify( mockHeaderProcessor, atLeastOnce() ).process( any( 
Exchange.class ) );
        order.verify( mockProcessor, atLeastOnce() ).process( any( 
Exchange.class ) );
        order.verify( onFinishMock ).process( any( Exchange.class ) );

    }

    private RouteBuilder getTestObject() {
        return new RouteBuilder() {

            @Override
            public void configure() throws Exception {
                from(trigger)
                    .onCompletion().process(onFinishMock).end()
                    .split().body()
                    .process(mockHeaderProcessor)
                    .aggregate(header(ID), mockAggregator)
                    .completionSize(header(SIZE))
                    //.parallelProcessing()
                    .process(mockProcessor)
                    .to(out);
            }
        };
    }
}
{code}

Imho switching parallelProcessing() shouldn't have impact on when exchange is 
complete?

I just checked without the aggregation the test is also green. Just exchange 
the route definition:

{code:java}
            @Override
            public void configure() throws Exception {
                from(trigger)
                    .onCompletion().process(onFinishMock).end()
                    .split().body()
                    .parallelProcessing()
                    .process(mockHeaderProcessor)
                    //.aggregate(header(ID), mockAggregator)
                    //.completionSize(header(SIZE))
                    .process(mockProcessor)
                    .to(out);
            }
{code}


  was:
If using onCompletion (or Routepolicy.onExchangeDone) in a route that splits 
and aggregates onCompletion is called on original exchange before any exchange 
is consumed if parallelProcessing() is set on aggregator definition.

I created a unit test with a simple route. the tests checks the order of calls 
on the mocks. the call on finish should be last (just (un)comment 
parallelProcessing to see the effect):

{code:java}
package de.ml;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.when;

import java.util.Arrays;
import java.util.UUID;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.direct.DirectEndpoint;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

@RunWith(MockitoJUnitRunner.class)
public class TestAggregator {
    private static final String SIZE = "size";
    private static final String ID = "id";
    private final CamelContext context = new DefaultCamelContext();
    @Mock
    private Processor mockProcessor;

    @Mock
    private Processor mockHeaderProcessor;

    @Mock
    private AggregationStrategy mockAggregator;

    @Mock
    private Processor onFinishMock;
    private DirectEndpoint trigger;
    private MockEndpoint out;

    @Before
    public void prepareMocks() throws Exception {
        when( mockAggregator.aggregate( any( Exchange.class ), any( 
Exchange.class ) ) ).then(
            new Answer<Exchange>() {

                @Override
                public Exchange answer( InvocationOnMock invocation ) throws 
Throwable {
                    // always just return old exchange
                    return (Exchange) invocation.getArguments()[1];
                }
            } );
        doAnswer( new Answer<Void>() {

            @Override
            public Void answer( InvocationOnMock invocation ) throws Throwable {
                Exchange exchange = (Exchange) invocation.getArguments()[0];
                exchange.getIn().setHeader( ID, UUID.randomUUID() );
                exchange.getIn().setHeader( SIZE, 1 );
                return null;
            }
        } ).when( mockHeaderProcessor ).process( any( Exchange.class ) );
    }

    @Test
    public void testRoute() throws Exception {
        trigger = context.getEndpoint( "direct:trigger", DirectEndpoint.class );
        out = context.getEndpoint( "mock:out", MockEndpoint.class );
        RoutesBuilder testObject = getTestObject();
        context.addRoutes( testObject );
        out.expectedMessageCount( 3 );
        context.start();
        context.createProducerTemplate().sendBody( trigger, Arrays.asList( 
"1a", "1b", "2c" ) );
        out.assertIsSatisfied();
        InOrder order = inOrder( mockHeaderProcessor, mockProcessor, 
onFinishMock );
        order.verify( mockHeaderProcessor, atLeastOnce() ).process( any( 
Exchange.class ) );
        order.verify( mockProcessor, atLeastOnce() ).process( any( 
Exchange.class ) );
        order.verify( onFinishMock ).process( any( Exchange.class ) );

    }

    private RouteBuilder getTestObject() {
        return new RouteBuilder() {

            @Override
            public void configure() throws Exception {
                from(trigger)
                    .onCompletion().process(onFinishMock).end()
                    .split().body()
                    .process(mockHeaderProcessor)
                    .aggregate(header(ID), mockAggregator)
                    .completionSize(header(SIZE))
                    //.parallelProcessing()
                    .process(mockProcessor)
                    .to(out);
            }
        };
    }
}
{code}

Imho switching parallelProcessing() shouldn't have impact on when exchange is 
complete?


> Aggreagator breaks onCompletion semantic if used with parallelProcessing
> ------------------------------------------------------------------------
>
>                 Key: CAMEL-9222
>                 URL: https://issues.apache.org/jira/browse/CAMEL-9222
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.14.3
>            Reporter: moritz löser
>
> If using onCompletion (or Routepolicy.onExchangeDone) in a route that splits 
> and aggregates onCompletion is called on original exchange before any 
> exchange is consumed if parallelProcessing() is set on aggregator definition.
> I created a unit test with a simple route. the tests checks the order of 
> calls on the mocks. the call on finish should be last (just (un)comment 
> parallelProcessing to see the effect):
> {code:java}
> package de.ml;
> import static org.mockito.Matchers.any;
> import static org.mockito.Mockito.atLeastOnce;
> import static org.mockito.Mockito.doAnswer;
> import static org.mockito.Mockito.inOrder;
> import static org.mockito.Mockito.when;
> import java.util.Arrays;
> import java.util.UUID;
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.RoutesBuilder;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.direct.DirectEndpoint;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.impl.DefaultCamelContext;
> import org.apache.camel.processor.aggregate.AggregationStrategy;
> import org.junit.Before;
> import org.junit.Test;
> import org.junit.runner.RunWith;
> import org.mockito.InOrder;
> import org.mockito.Mock;
> import org.mockito.invocation.InvocationOnMock;
> import org.mockito.runners.MockitoJUnitRunner;
> import org.mockito.stubbing.Answer;
> @RunWith(MockitoJUnitRunner.class)
> public class TestAggregator {
>     private static final String SIZE = "size";
>     private static final String ID = "id";
>     private final CamelContext context = new DefaultCamelContext();
>     @Mock
>     private Processor mockProcessor;
>     @Mock
>     private Processor mockHeaderProcessor;
>     @Mock
>     private AggregationStrategy mockAggregator;
>     @Mock
>     private Processor onFinishMock;
>     private DirectEndpoint trigger;
>     private MockEndpoint out;
>     @Before
>     public void prepareMocks() throws Exception {
>         when( mockAggregator.aggregate( any( Exchange.class ), any( 
> Exchange.class ) ) ).then(
>             new Answer<Exchange>() {
>                 @Override
>                 public Exchange answer( InvocationOnMock invocation ) throws 
> Throwable {
>                     // always just return old exchange
>                     return (Exchange) invocation.getArguments()[1];
>                 }
>             } );
>         doAnswer( new Answer<Void>() {
>             @Override
>             public Void answer( InvocationOnMock invocation ) throws 
> Throwable {
>                 Exchange exchange = (Exchange) invocation.getArguments()[0];
>                 exchange.getIn().setHeader( ID, UUID.randomUUID() );
>                 exchange.getIn().setHeader( SIZE, 1 );
>                 return null;
>             }
>         } ).when( mockHeaderProcessor ).process( any( Exchange.class ) );
>     }
>     @Test
>     public void testRoute() throws Exception {
>         trigger = context.getEndpoint( "direct:trigger", DirectEndpoint.class 
> );
>         out = context.getEndpoint( "mock:out", MockEndpoint.class );
>         RoutesBuilder testObject = getTestObject();
>         context.addRoutes( testObject );
>         out.expectedMessageCount( 3 );
>         context.start();
>         context.createProducerTemplate().sendBody( trigger, Arrays.asList( 
> "1a", "1b", "2c" ) );
>         out.assertIsSatisfied();
>         InOrder order = inOrder( mockHeaderProcessor, mockProcessor, 
> onFinishMock );
>         order.verify( mockHeaderProcessor, atLeastOnce() ).process( any( 
> Exchange.class ) );
>         order.verify( mockProcessor, atLeastOnce() ).process( any( 
> Exchange.class ) );
>         order.verify( onFinishMock ).process( any( Exchange.class ) );
>     }
>     private RouteBuilder getTestObject() {
>         return new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 from(trigger)
>                     .onCompletion().process(onFinishMock).end()
>                     .split().body()
>                     .process(mockHeaderProcessor)
>                     .aggregate(header(ID), mockAggregator)
>                     .completionSize(header(SIZE))
>                     //.parallelProcessing()
>                     .process(mockProcessor)
>                     .to(out);
>             }
>         };
>     }
> }
> {code}
> Imho switching parallelProcessing() shouldn't have impact on when exchange is 
> complete?
> I just checked without the aggregation the test is also green. Just exchange 
> the route definition:
> {code:java}
>             @Override
>             public void configure() throws Exception {
>                 from(trigger)
>                     .onCompletion().process(onFinishMock).end()
>                     .split().body()
>                     .parallelProcessing()
>                     .process(mockHeaderProcessor)
>                     //.aggregate(header(ID), mockAggregator)
>                     //.completionSize(header(SIZE))
>                     .process(mockProcessor)
>                     .to(out);
>             }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to