[
https://issues.apache.org/jira/browse/CAMEL-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
moritz löser updated CAMEL-9222:
--------------------------------
Description:
If using onCompletion (or Routepolicy.onExchangeDone) in a route that splits
and aggregates onCompletion is called on original exchange before any exchange
is consumed if parallelProcessing() is set on aggregator definition.
I created a unit test with a simple route. the tests checks the order of calls
on the mocks. the call on finish should be last (just (un)comment
parallelProcessing to see the effect):
{code:java}
package de.ml;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.UUID;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.direct.DirectEndpoint;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
@RunWith(MockitoJUnitRunner.class)
public class TestAggregator {
private static final String SIZE = "size";
private static final String ID = "id";
private final CamelContext context = new DefaultCamelContext();
@Mock
private Processor mockProcessor;
@Mock
private Processor mockHeaderProcessor;
@Mock
private AggregationStrategy mockAggregator;
@Mock
private Processor onFinishMock;
private DirectEndpoint trigger;
private MockEndpoint out;
@Before
public void prepareMocks() throws Exception {
when( mockAggregator.aggregate( any( Exchange.class ), any(
Exchange.class ) ) ).then(
new Answer<Exchange>() {
@Override
public Exchange answer( InvocationOnMock invocation ) throws
Throwable {
// always just return old exchange
return (Exchange) invocation.getArguments()[1];
}
} );
doAnswer( new Answer<Void>() {
@Override
public Void answer( InvocationOnMock invocation ) throws Throwable {
Exchange exchange = (Exchange) invocation.getArguments()[0];
exchange.getIn().setHeader( ID, UUID.randomUUID() );
exchange.getIn().setHeader( SIZE, 1 );
return null;
}
} ).when( mockHeaderProcessor ).process( any( Exchange.class ) );
}
@Test
public void testRoute() throws Exception {
trigger = context.getEndpoint( "direct:trigger", DirectEndpoint.class );
out = context.getEndpoint( "mock:out", MockEndpoint.class );
RoutesBuilder testObject = getTestObject();
context.addRoutes( testObject );
out.expectedMessageCount( 3 );
context.start();
context.createProducerTemplate().sendBody( trigger, Arrays.asList(
"1a", "1b", "2c" ) );
out.assertIsSatisfied();
InOrder order = inOrder( mockHeaderProcessor, mockProcessor,
onFinishMock );
order.verify( mockHeaderProcessor, atLeastOnce() ).process( any(
Exchange.class ) );
order.verify( mockProcessor, atLeastOnce() ).process( any(
Exchange.class ) );
order.verify( onFinishMock ).process( any( Exchange.class ) );
}
private RouteBuilder getTestObject() {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from(trigger)
.onCompletion().process(onFinishMock).end()
.split().body()
.process(mockHeaderProcessor)
.aggregate(header(ID), mockAggregator)
.completionSize(header(SIZE))
//.parallelProcessing()
.process(mockProcessor)
.to(out);
}
};
}
}
{code}
Imho switching parallelProcessing() shouldn't have impact on when exchange is
complete?
I just checked without the aggregation the test is also green. Just exchange
the route definition:
{code:java}
@Override
public void configure() throws Exception {
from(trigger)
.onCompletion().process(onFinishMock).end()
.split().body()
.parallelProcessing()
.process(mockHeaderProcessor)
//.aggregate(header(ID), mockAggregator)
//.completionSize(header(SIZE))
.process(mockProcessor)
.to(out);
}
{code}
was:
If using onCompletion (or Routepolicy.onExchangeDone) in a route that splits
and aggregates onCompletion is called on original exchange before any exchange
is consumed if parallelProcessing() is set on aggregator definition.
I created a unit test with a simple route. the tests checks the order of calls
on the mocks. the call on finish should be last (just (un)comment
parallelProcessing to see the effect):
{code:java}
package de.ml;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.UUID;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.direct.DirectEndpoint;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
@RunWith(MockitoJUnitRunner.class)
public class TestAggregator {
private static final String SIZE = "size";
private static final String ID = "id";
private final CamelContext context = new DefaultCamelContext();
@Mock
private Processor mockProcessor;
@Mock
private Processor mockHeaderProcessor;
@Mock
private AggregationStrategy mockAggregator;
@Mock
private Processor onFinishMock;
private DirectEndpoint trigger;
private MockEndpoint out;
@Before
public void prepareMocks() throws Exception {
when( mockAggregator.aggregate( any( Exchange.class ), any(
Exchange.class ) ) ).then(
new Answer<Exchange>() {
@Override
public Exchange answer( InvocationOnMock invocation ) throws
Throwable {
// always just return old exchange
return (Exchange) invocation.getArguments()[1];
}
} );
doAnswer( new Answer<Void>() {
@Override
public Void answer( InvocationOnMock invocation ) throws Throwable {
Exchange exchange = (Exchange) invocation.getArguments()[0];
exchange.getIn().setHeader( ID, UUID.randomUUID() );
exchange.getIn().setHeader( SIZE, 1 );
return null;
}
} ).when( mockHeaderProcessor ).process( any( Exchange.class ) );
}
@Test
public void testRoute() throws Exception {
trigger = context.getEndpoint( "direct:trigger", DirectEndpoint.class );
out = context.getEndpoint( "mock:out", MockEndpoint.class );
RoutesBuilder testObject = getTestObject();
context.addRoutes( testObject );
out.expectedMessageCount( 3 );
context.start();
context.createProducerTemplate().sendBody( trigger, Arrays.asList(
"1a", "1b", "2c" ) );
out.assertIsSatisfied();
InOrder order = inOrder( mockHeaderProcessor, mockProcessor,
onFinishMock );
order.verify( mockHeaderProcessor, atLeastOnce() ).process( any(
Exchange.class ) );
order.verify( mockProcessor, atLeastOnce() ).process( any(
Exchange.class ) );
order.verify( onFinishMock ).process( any( Exchange.class ) );
}
private RouteBuilder getTestObject() {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from(trigger)
.onCompletion().process(onFinishMock).end()
.split().body()
.process(mockHeaderProcessor)
.aggregate(header(ID), mockAggregator)
.completionSize(header(SIZE))
//.parallelProcessing()
.process(mockProcessor)
.to(out);
}
};
}
}
{code}
Imho switching parallelProcessing() shouldn't have impact on when exchange is
complete?
> Aggreagator breaks onCompletion semantic if used with parallelProcessing
> ------------------------------------------------------------------------
>
> Key: CAMEL-9222
> URL: https://issues.apache.org/jira/browse/CAMEL-9222
> Project: Camel
> Issue Type: Bug
> Components: camel-core
> Affects Versions: 2.14.3
> Reporter: moritz löser
>
> If using onCompletion (or Routepolicy.onExchangeDone) in a route that splits
> and aggregates onCompletion is called on original exchange before any
> exchange is consumed if parallelProcessing() is set on aggregator definition.
> I created a unit test with a simple route. the tests checks the order of
> calls on the mocks. the call on finish should be last (just (un)comment
> parallelProcessing to see the effect):
> {code:java}
> package de.ml;
> import static org.mockito.Matchers.any;
> import static org.mockito.Mockito.atLeastOnce;
> import static org.mockito.Mockito.doAnswer;
> import static org.mockito.Mockito.inOrder;
> import static org.mockito.Mockito.when;
> import java.util.Arrays;
> import java.util.UUID;
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.RoutesBuilder;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.direct.DirectEndpoint;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.impl.DefaultCamelContext;
> import org.apache.camel.processor.aggregate.AggregationStrategy;
> import org.junit.Before;
> import org.junit.Test;
> import org.junit.runner.RunWith;
> import org.mockito.InOrder;
> import org.mockito.Mock;
> import org.mockito.invocation.InvocationOnMock;
> import org.mockito.runners.MockitoJUnitRunner;
> import org.mockito.stubbing.Answer;
> @RunWith(MockitoJUnitRunner.class)
> public class TestAggregator {
> private static final String SIZE = "size";
> private static final String ID = "id";
> private final CamelContext context = new DefaultCamelContext();
> @Mock
> private Processor mockProcessor;
> @Mock
> private Processor mockHeaderProcessor;
> @Mock
> private AggregationStrategy mockAggregator;
> @Mock
> private Processor onFinishMock;
> private DirectEndpoint trigger;
> private MockEndpoint out;
> @Before
> public void prepareMocks() throws Exception {
> when( mockAggregator.aggregate( any( Exchange.class ), any(
> Exchange.class ) ) ).then(
> new Answer<Exchange>() {
> @Override
> public Exchange answer( InvocationOnMock invocation ) throws
> Throwable {
> // always just return old exchange
> return (Exchange) invocation.getArguments()[1];
> }
> } );
> doAnswer( new Answer<Void>() {
> @Override
> public Void answer( InvocationOnMock invocation ) throws
> Throwable {
> Exchange exchange = (Exchange) invocation.getArguments()[0];
> exchange.getIn().setHeader( ID, UUID.randomUUID() );
> exchange.getIn().setHeader( SIZE, 1 );
> return null;
> }
> } ).when( mockHeaderProcessor ).process( any( Exchange.class ) );
> }
> @Test
> public void testRoute() throws Exception {
> trigger = context.getEndpoint( "direct:trigger", DirectEndpoint.class
> );
> out = context.getEndpoint( "mock:out", MockEndpoint.class );
> RoutesBuilder testObject = getTestObject();
> context.addRoutes( testObject );
> out.expectedMessageCount( 3 );
> context.start();
> context.createProducerTemplate().sendBody( trigger, Arrays.asList(
> "1a", "1b", "2c" ) );
> out.assertIsSatisfied();
> InOrder order = inOrder( mockHeaderProcessor, mockProcessor,
> onFinishMock );
> order.verify( mockHeaderProcessor, atLeastOnce() ).process( any(
> Exchange.class ) );
> order.verify( mockProcessor, atLeastOnce() ).process( any(
> Exchange.class ) );
> order.verify( onFinishMock ).process( any( Exchange.class ) );
> }
> private RouteBuilder getTestObject() {
> return new RouteBuilder() {
> @Override
> public void configure() throws Exception {
> from(trigger)
> .onCompletion().process(onFinishMock).end()
> .split().body()
> .process(mockHeaderProcessor)
> .aggregate(header(ID), mockAggregator)
> .completionSize(header(SIZE))
> //.parallelProcessing()
> .process(mockProcessor)
> .to(out);
> }
> };
> }
> }
> {code}
> Imho switching parallelProcessing() shouldn't have impact on when exchange is
> complete?
> I just checked without the aggregation the test is also green. Just exchange
> the route definition:
> {code:java}
> @Override
> public void configure() throws Exception {
> from(trigger)
> .onCompletion().process(onFinishMock).end()
> .split().body()
> .parallelProcessing()
> .process(mockHeaderProcessor)
> //.aggregate(header(ID), mockAggregator)
> //.completionSize(header(SIZE))
> .process(mockProcessor)
> .to(out);
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)