This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch zip in repository https://gitbox.apache.org/repos/asf/camel.git
commit 90597bb26a0b87fd3be219067bd6d71ae7e8ce0b Author: Claus Ibsen <[email protected]> AuthorDate: Mon Sep 2 09:37:06 2024 +0200 CAMEL-21114: camel-zipfile - ZipSplitter with AggregationStrategy does not aggregate all splits in transacted mode. --- components/camel-zipfile/pom.xml | 16 ++++ .../camel/dataformat/zipfile/ZipIterator.java | 12 +++ .../processor/aggregate/zipfile/DummyZip.java | 38 +++++++++ .../ZipSplitAggregateTransactedIssueTest.java | 98 ++++++++++++++++++++++ .../src/test/resources/log4j2.properties | 2 +- .../apache/camel/processor/MulticastProcessor.java | 14 +++- 6 files changed, 177 insertions(+), 3 deletions(-) diff --git a/components/camel-zipfile/pom.xml b/components/camel-zipfile/pom.xml index 444e1a4641a..f907b0d3e65 100644 --- a/components/camel-zipfile/pom.xml +++ b/components/camel-zipfile/pom.xml @@ -55,6 +55,22 @@ <artifactId>camel-spring-xml</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-base64</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-spring-jdbc</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <version>2.3.232</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-test-spring-junit5</artifactId> diff --git a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java index 33e152de845..153bc9d3d6b 100644 --- a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java +++ b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java @@ -67,6 +67,12 @@ public class ZipIterator implements Iterator<Message>, Closeable { @Override public boolean hasNext() { + boolean answer = doHasNext(); + System.out.println("hasNext: " + answer); + return answer; + } + + protected boolean doHasNext() { try { if (zipInputStream == null) { return false; @@ -93,6 +99,12 @@ public class ZipIterator implements Iterator<Message>, Closeable { @Override public Message next() { + Message answer = doNext(); + System.out.println("next: " + answer); + return answer; + } + + protected Message doNext() { if (parent == null) { parent = getNextElement(); } diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/DummyZip.java b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/DummyZip.java new file mode 100644 index 00000000000..fdd9512d085 --- /dev/null +++ b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/DummyZip.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.zipfile; + +import java.util.Iterator; +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.Expression; + +public class DummyZip implements Expression { + + private final Iterator list = List.of("Orders1.xml", "Orders2.xml").iterator(); + + public Object evaluate(Exchange exchange) { + return list; + } + + @Override + public <T> T evaluate(Exchange exchange, Class<T> type) { + Object result = evaluate(exchange); + return exchange.getContext().getTypeConverter().convertTo(type, exchange, result); + } +} diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipSplitAggregateTransactedIssueTest.java b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipSplitAggregateTransactedIssueTest.java new file mode 100644 index 00000000000..52643993fb0 --- /dev/null +++ b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipSplitAggregateTransactedIssueTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.zipfile; + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.dataformat.zipfile.ZipSplitter; +import org.apache.camel.spring.spi.SpringTransactionPolicy; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.h2.jdbcx.JdbcDataSource; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.transaction.support.TransactionTemplate; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ZipSplitAggregateTransactedIssueTest extends CamelTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(ZipSplitAggregateTransactedIssueTest.class); + + String zipArchiveWithTwoFiles + = "UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2 [...] + + @Test + public void testIfAllSplitsAggregated() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + template.sendBody("direct:start", ""); + + // Check if second file was processed in aggregate() method of AggregationStrategy + assertEquals("Orders2.xml", mock.getExchanges().get(0).getMessage().getHeader("CamelFileName", String.class)); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + + JdbcDataSource dataSource = new JdbcDataSource(); + dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1"); + dataSource.setUser("sa"); + dataSource.setPassword(""); + + DataSourceTransactionManager txManager = new DataSourceTransactionManager(dataSource); + + TransactionTemplate transactionTemplate = new TransactionTemplate(txManager); + transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED"); + transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED"); + transactionTemplate.setTimeout(1800); + + SpringTransactionPolicy springTransactionPolicy = new SpringTransactionPolicy(); + springTransactionPolicy.setTransactionManager(txManager); + springTransactionPolicy.setTransactionTemplate(transactionTemplate); + + getContext().getRegistry().bind("transacted", springTransactionPolicy); + getContext().getRegistry().bind("zipSplitter", new ZipSplitter()); + // getContext().getRegistry().bind("zipSplitter", new DummyZip()); + + from("direct:start") + .transacted("transacted") + .setBody().simple(zipArchiveWithTwoFiles) + .unmarshal().base64() + .split().ref("zipSplitter").streaming().aggregationStrategy(new StringAggregationStrategy()) + .log("Splitting ${header.CamelFileName}") + .end() + .to("mock:result"); + } + }; + } + + private static class StringAggregationStrategy implements AggregationStrategy { + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + String name = newExchange.getMessage().getHeader("CamelFileName", String.class); + // String name = newExchange.getMessage().getBody(String.class); + LOG.info("Aggregating {}", name); + return newExchange; + } + } +} diff --git a/components/camel-zipfile/src/test/resources/log4j2.properties b/components/camel-zipfile/src/test/resources/log4j2.properties index 32fa9311108..55263e9ebcb 100644 --- a/components/camel-zipfile/src/test/resources/log4j2.properties +++ b/components/camel-zipfile/src/test/resources/log4j2.properties @@ -25,4 +25,4 @@ appender.out.name = out appender.out.layout.type = PatternLayout appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n rootLogger.level = INFO -rootLogger.appenderRef.file.ref = file +rootLogger.appenderRef.file.ref = out diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 29dfa26857b..da2d59d1067 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -469,6 +469,7 @@ public class MulticastProcessor extends AsyncProcessorSupport } protected void aggregate() { + System.out.println("aggregate"); Lock lock = this.lock; if (lock.tryLock()) { try { @@ -667,9 +668,15 @@ public class MulticastProcessor extends AsyncProcessorSupport } catch (Exception e) { original.setException(e); doDone(null, false); - return; + next = false; } } + // execute any pending tasks + // while (reactiveExecutor.executeFromQueue()) { + // System.out.println("pending work"); + // } + // fail-safe to ensure we trigger done + doDone(null, false); } boolean doRun() throws Exception { @@ -687,8 +694,11 @@ public class MulticastProcessor extends AsyncProcessorSupport } ProcessorExchangePair pair = iterator.next(); - boolean hasNext = iterator.hasNext(); + if (pair == null) { + return true; + } + boolean hasNext = iterator.hasNext(); Exchange exchange = pair.getExchange(); int index = nbExchangeSent.getAndIncrement(); updateNewExchange(exchange, index, pairs, hasNext);
