Andre Weickel created CAMEL-21114:
-------------------------------------
Summary: ZipSplitter with AggregationStrategy does not aggregate
all splits
Key: CAMEL-21114
URL: https://issues.apache.org/jira/browse/CAMEL-21114
Project: Camel
Issue Type: Bug
Affects Versions: 3.14.10
Reporter: Andre Weickel
A transacted route with ZipSplitter and Aggregation Strategy does not aggregate
the last zip file entry. The issue only occurs for transacted routes.
Example:
_Zip Archive_
* _A.xml_
* _B.xml_
Both splits are processed but only for the first exchange (A.xml) the aggregate
method is called.
For a zip archive with two entries the doRun() method of
MulticastTransactedTask is called three times. The third time iterator.next()
returns null although hasNext() was true. As a result the doDone() method is
called but there is still a task in the queue (with the second exchange). This
task is processed after doDone() was executed but it’s not aggregated because
of a done check in aggregate() of MulticastTransactedTask.
We found the problem in Camel 3.14, but it is still present in Camel 3.22.
It can be reproduced with the following test (it works if you remove the
transacted tag from the route)
{code:java}
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.dataformat.zipfile.ZipSplitter;
import org.apache.camel.spring.spi.SpringTransactionPolicy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.h2.jdbcx.JdbcDataSource;
import org.junit.Test;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
public class ZipSplitterTest extends CamelTestSupport {
String zipArchiveWithTwoFiles
="UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2FCAF/S1WAAAACoBAAALACQAAAAAAAAAIAAAAHkAAABPcmRlcnMyLnhtbAoAIAAAAAAAAQAYAAAxPXoJStYBjn3iuHn02gEAMT16CUrWAVBLBQYAAAAAAgACALoAAAD6AAAAAAA=";
@Test
public void testIfAllSplitsAggregated() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
template.sendBody("direct:start", ""); // Check if second file
was processed in aggregate() method of AggregationStrategy
assertEquals("Orders2.xml",
mock.getExchanges().get(0).getMessage().getHeader("CamelFileName",
String.class));
} @Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
JdbcDataSource dataSource = new JdbcDataSource();
dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1");
dataSource.setUser("sa");
dataSource.setPassword("");
DataSourceTransactionManager txManager = new
DataSourceTransactionManager(dataSource); TransactionTemplate
transactionTemplate = new TransactionTemplate(txManager);
transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED");
transactionTemplate.setTimeout(1800);
SpringTransactionPolicy springTransactionPolicy = new SpringTransactionPolicy();
springTransactionPolicy.setTransactionManager(txManager);
springTransactionPolicy.setTransactionTemplate(transactionTemplate);
getContext().getRegistry().bind("transacted",
springTransactionPolicy);
getContext().getRegistry().bind("zipSplitter", new
ZipSplitter()); from("direct:start")
.transacted("transacted")
.setBody().simple(zipArchiveWithTwoFiles)
.unmarshal().base64()
.split().ref("zipSplitter").streaming().aggregationStrategy(new
StringAggregationStrategy())
.log("Splitted")
.end()
.to("mock:result");
}
};
}
private static class StringAggregationStrategy implements
AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
System.out.println(newExchange.getMessage().getHeader("CamelFileName"));
return newExchange;
}
}
}{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)