This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 1f40dd6d373 CAMEL-21114: Fix multicast EIP in transacted mode with
camel-zipfile (#15390)
1f40dd6d373 is described below
commit 1f40dd6d373eeaaa3bb1f857e3d46a030554d2f8
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Sep 2 13:13:48 2024 +0200
CAMEL-21114: Fix multicast EIP in transacted mode with camel-zipfile
(#15390)
* CAMEL-21114: camel-zipfile - ZipSplitter with AggregationStrategy does
not aggregate all splits in transacted mode.
Thanks to Andre Weickel for the unit test.
---
components/camel-zipfile/pom.xml | 16 ++++
.../camel/dataformat/zipfile/ZipIterator.java | 18 +++-
.../ZipSplitAggregateTransactedIssueTest.java | 99 ++++++++++++++++++++++
.../apache/camel/processor/MulticastProcessor.java | 34 ++++++--
4 files changed, 157 insertions(+), 10 deletions(-)
diff --git a/components/camel-zipfile/pom.xml b/components/camel-zipfile/pom.xml
index 444e1a4641a..f907b0d3e65 100644
--- a/components/camel-zipfile/pom.xml
+++ b/components/camel-zipfile/pom.xml
@@ -55,6 +55,22 @@
<artifactId>camel-spring-xml</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-base64</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-spring-jdbc</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <version>2.3.232</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-spring-junit5</artifactId>
diff --git
a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java
b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java
index 33e152de845..974b8d5b94d 100644
---
a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java
+++
b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
* <a
href="http://camel.465427.n5.nabble.com/zip-file-best-practices-td5713437.html">zip
file best practices</a>
*/
public class ZipIterator implements Iterator<Message>, Closeable {
- static final Logger LOGGER = LoggerFactory.getLogger(ZipIterator.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ZipIterator.class);
private final Exchange exchange;
private boolean allowEmptyDirectory;
@@ -67,6 +67,12 @@ public class ZipIterator implements Iterator<Message>,
Closeable {
@Override
public boolean hasNext() {
+ boolean answer = doHasNext();
+ LOG.trace("hasNext: {}", answer);
+ return answer;
+ }
+
+ protected boolean doHasNext() {
try {
if (zipInputStream == null) {
return false;
@@ -93,6 +99,12 @@ public class ZipIterator implements Iterator<Message>,
Closeable {
@Override
public Message next() {
+ Message answer = doNext();
+ LOG.trace("next: {}", answer);
+ return answer;
+ }
+
+ protected Message doNext() {
if (parent == null) {
parent = getNextElement();
}
@@ -118,7 +130,7 @@ public class ZipIterator implements Iterator<Message>,
Closeable {
ZipEntry current = getNextEntry();
if (current != null) {
- LOGGER.debug("read zipEntry {}", current.getName());
+ LOG.debug("read zipEntry {}", current.getName());
Message answer = new DefaultMessage(exchange.getContext());
answer.getHeaders().putAll(exchange.getIn().getHeaders());
answer.setHeader("zipFileName", current.getName());
@@ -126,7 +138,7 @@ public class ZipIterator implements Iterator<Message>,
Closeable {
answer.setBody(new ZipInputStreamWrapper(zipInputStream));
return answer;
} else {
- LOGGER.trace("close zipInputStream");
+ LOG.trace("close zipInputStream");
return null;
}
} catch (IOException exception) {
diff --git
a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipSplitAggregateTransactedIssueTest.java
b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipSplitAggregateTransactedIssueTest.java
new file mode 100644
index 00000000000..3d9ef361185
--- /dev/null
+++
b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipSplitAggregateTransactedIssueTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.zipfile;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.dataformat.zipfile.ZipSplitter;
+import org.apache.camel.spring.spi.SpringTransactionPolicy;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.h2.jdbcx.JdbcDataSource;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.transaction.support.TransactionTemplate;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ZipSplitAggregateTransactedIssueTest extends CamelTestSupport {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ZipSplitAggregateTransactedIssueTest.class);
+
+ String zipArchiveWithTwoFiles
+ =
"UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2
[...]
+
+ @Test
+ public void testIfAllSplitsAggregated() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+
+ template.sendBody("direct:start", "");
+
+ mock.assertIsSatisfied();
+
+ // Check if second file was processed in aggregate() method of
AggregationStrategy
+ assertEquals("Orders2.xml",
mock.getExchanges().get(0).getMessage().getHeader("CamelFileName",
String.class));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+
+ JdbcDataSource dataSource = new JdbcDataSource();
+ dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1");
+ dataSource.setUser("sa");
+ dataSource.setPassword("");
+
+ DataSourceTransactionManager txManager = new
DataSourceTransactionManager(dataSource);
+
+ TransactionTemplate transactionTemplate = new
TransactionTemplate(txManager);
+
transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
+
transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED");
+ transactionTemplate.setTimeout(1800);
+
+ SpringTransactionPolicy springTransactionPolicy = new
SpringTransactionPolicy();
+ springTransactionPolicy.setTransactionManager(txManager);
+
springTransactionPolicy.setTransactionTemplate(transactionTemplate);
+
+ getContext().getRegistry().bind("transacted",
springTransactionPolicy);
+ getContext().getRegistry().bind("zipSplitter", new
ZipSplitter());
+
+ from("direct:start")
+ .transacted("transacted")
+ .setBody().simple(zipArchiveWithTwoFiles)
+ .unmarshal().base64()
+
.split().ref("zipSplitter").streaming().aggregationStrategy(new
StringAggregationStrategy())
+ .log("Splitting ${header.CamelFileName}")
+ .end()
+ .to("mock:result");
+ }
+ };
+ }
+
+ private static class StringAggregationStrategy implements
AggregationStrategy {
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ String name = newExchange.getMessage().getHeader("CamelFileName",
String.class);
+ LOG.info("Aggregating {}", name);
+ return newExchange;
+ }
+ }
+}
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 29dfa26857b..ea91c04a4e3 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -147,7 +147,15 @@ public class MulticastProcessor extends
AsyncProcessorSupport
@Override
public void execute(Runnable command) {
- schedule(command);
+ schedule(command, false);
+ }
+ }
+
+ private final class SyncScheduler implements Executor {
+
+ @Override
+ public void execute(Runnable command) {
+ schedule(command, true);
}
}
@@ -172,6 +180,7 @@ public class MulticastProcessor extends
AsyncProcessorSupport
private final ExecutorService executorService;
private final boolean shutdownExecutorService;
private final Scheduler scheduler = new Scheduler();
+ private final SyncScheduler syncScheduler = new SyncScheduler();
private ExecutorService aggregateExecutorService;
private boolean shutdownAggregateExecutorService;
private final long timeout;
@@ -370,6 +379,10 @@ public class MulticastProcessor extends
AsyncProcessorSupport
}
protected void schedule(final Runnable runnable) {
+ schedule(runnable, false);
+ }
+
+ protected void schedule(final Runnable runnable, boolean sync) {
if (isParallelProcessing()) {
Runnable task = prepareParallelTask(runnable);
try {
@@ -379,6 +392,8 @@ public class MulticastProcessor extends
AsyncProcessorSupport
rej.reject();
}
}
+ } else if (sync) {
+ reactiveExecutor.scheduleSync(runnable);
} else {
reactiveExecutor.schedule(runnable);
}
@@ -431,7 +446,8 @@ public class MulticastProcessor extends
AsyncProcessorSupport
final Map<String, String> mdc;
final ScheduledFuture<?> timeoutTask;
- MulticastTask(Exchange original, Iterable<ProcessorExchangePair>
pairs, AsyncCallback callback, int capacity) {
+ MulticastTask(Exchange original, Iterable<ProcessorExchangePair>
pairs, AsyncCallback callback, int capacity,
+ boolean sync) {
this.original = original;
this.pairs = pairs;
this.callback = callback;
@@ -450,9 +466,10 @@ public class MulticastProcessor extends
AsyncProcessorSupport
this.mdc = null;
}
if (capacity > 0) {
- this.completion = new AsyncCompletionService<>(scheduler,
!isStreaming(), lock, capacity);
+ this.completion
+ = new AsyncCompletionService<>(sync ? syncScheduler :
scheduler, !isStreaming(), lock, capacity);
} else {
- this.completion = new AsyncCompletionService<>(scheduler,
!isStreaming(), lock);
+ this.completion = new AsyncCompletionService<>(sync ?
syncScheduler : scheduler, !isStreaming(), lock);
}
}
@@ -556,7 +573,7 @@ public class MulticastProcessor extends
AsyncProcessorSupport
public MulticastReactiveTask(Exchange original,
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback,
int size) {
- super(original, pairs, callback, size);
+ super(original, pairs, callback, size, false);
}
@Override
@@ -653,7 +670,7 @@ public class MulticastProcessor extends
AsyncProcessorSupport
public MulticastTransactedTask(Exchange original,
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback,
int size) {
- super(original, pairs, callback, size);
+ super(original, pairs, callback, size, true);
}
@Override
@@ -687,8 +704,11 @@ public class MulticastProcessor extends
AsyncProcessorSupport
}
ProcessorExchangePair pair = iterator.next();
- boolean hasNext = iterator.hasNext();
+ if (pair == null) {
+ return true; // go again to check hasNext
+ }
+ boolean hasNext = iterator.hasNext();
Exchange exchange = pair.getExchange();
int index = nbExchangeSent.getAndIncrement();
updateNewExchange(exchange, index, pairs, hasNext);