This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch zip
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 90597bb26a0b87fd3be219067bd6d71ae7e8ce0b
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Sep 2 09:37:06 2024 +0200

    CAMEL-21114: camel-zipfile - ZipSplitter with AggregationStrategy does not 
aggregate all splits in transacted mode.
---
 components/camel-zipfile/pom.xml                   | 16 ++++
 .../camel/dataformat/zipfile/ZipIterator.java      | 12 +++
 .../processor/aggregate/zipfile/DummyZip.java      | 38 +++++++++
 .../ZipSplitAggregateTransactedIssueTest.java      | 98 ++++++++++++++++++++++
 .../src/test/resources/log4j2.properties           |  2 +-
 .../apache/camel/processor/MulticastProcessor.java | 14 +++-
 6 files changed, 177 insertions(+), 3 deletions(-)

diff --git a/components/camel-zipfile/pom.xml b/components/camel-zipfile/pom.xml
index 444e1a4641a..f907b0d3e65 100644
--- a/components/camel-zipfile/pom.xml
+++ b/components/camel-zipfile/pom.xml
@@ -55,6 +55,22 @@
             <artifactId>camel-spring-xml</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-base64</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-spring-jdbc</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <version>2.3.232</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-test-spring-junit5</artifactId>
diff --git 
a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java
 
b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java
index 33e152de845..153bc9d3d6b 100644
--- 
a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java
+++ 
b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java
@@ -67,6 +67,12 @@ public class ZipIterator implements Iterator<Message>, 
Closeable {
 
     @Override
     public boolean hasNext() {
+        boolean answer = doHasNext();
+        System.out.println("hasNext: " + answer);
+        return answer;
+    }
+
+    protected boolean doHasNext() {
         try {
             if (zipInputStream == null) {
                 return false;
@@ -93,6 +99,12 @@ public class ZipIterator implements Iterator<Message>, 
Closeable {
 
     @Override
     public Message next() {
+        Message answer = doNext();
+        System.out.println("next: " + answer);
+        return answer;
+    }
+
+    protected Message doNext() {
         if (parent == null) {
             parent = getNextElement();
         }
diff --git 
a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/DummyZip.java
 
b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/DummyZip.java
new file mode 100644
index 00000000000..fdd9512d085
--- /dev/null
+++ 
b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/DummyZip.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.zipfile;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+
+public class DummyZip implements Expression {
+
+    private final Iterator list = List.of("Orders1.xml", 
"Orders2.xml").iterator();
+
+    public Object evaluate(Exchange exchange) {
+        return list;
+    }
+
+    @Override
+    public <T> T evaluate(Exchange exchange, Class<T> type) {
+        Object result = evaluate(exchange);
+        return exchange.getContext().getTypeConverter().convertTo(type, 
exchange, result);
+    }
+}
diff --git 
a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipSplitAggregateTransactedIssueTest.java
 
b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipSplitAggregateTransactedIssueTest.java
new file mode 100644
index 00000000000..52643993fb0
--- /dev/null
+++ 
b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipSplitAggregateTransactedIssueTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.zipfile;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.dataformat.zipfile.ZipSplitter;
+import org.apache.camel.spring.spi.SpringTransactionPolicy;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.h2.jdbcx.JdbcDataSource;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.transaction.support.TransactionTemplate;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ZipSplitAggregateTransactedIssueTest extends CamelTestSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZipSplitAggregateTransactedIssueTest.class);
+
+    String zipArchiveWithTwoFiles
+            = 
"UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2
 [...]
+
+    @Test
+    public void testIfAllSplitsAggregated() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        template.sendBody("direct:start", "");
+
+        // Check if second file was processed in aggregate() method of 
AggregationStrategy
+        assertEquals("Orders2.xml", 
mock.getExchanges().get(0).getMessage().getHeader("CamelFileName", 
String.class));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                JdbcDataSource dataSource = new JdbcDataSource();
+                dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1");
+                dataSource.setUser("sa");
+                dataSource.setPassword("");
+
+                DataSourceTransactionManager txManager = new 
DataSourceTransactionManager(dataSource);
+
+                TransactionTemplate transactionTemplate = new 
TransactionTemplate(txManager);
+                
transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
+                
transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED");
+                transactionTemplate.setTimeout(1800);
+
+                SpringTransactionPolicy springTransactionPolicy = new 
SpringTransactionPolicy();
+                springTransactionPolicy.setTransactionManager(txManager);
+                
springTransactionPolicy.setTransactionTemplate(transactionTemplate);
+
+                getContext().getRegistry().bind("transacted", 
springTransactionPolicy);
+                getContext().getRegistry().bind("zipSplitter", new 
ZipSplitter());
+                //                
getContext().getRegistry().bind("zipSplitter", new DummyZip());
+
+                from("direct:start")
+                        .transacted("transacted")
+                        .setBody().simple(zipArchiveWithTwoFiles)
+                        .unmarshal().base64()
+                        
.split().ref("zipSplitter").streaming().aggregationStrategy(new 
StringAggregationStrategy())
+                            .log("Splitting ${header.CamelFileName}")
+                        .end()
+                        .to("mock:result");
+            }
+        };
+    }
+
+    private static class StringAggregationStrategy implements 
AggregationStrategy {
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            String name = newExchange.getMessage().getHeader("CamelFileName", 
String.class);
+            //            String name = 
newExchange.getMessage().getBody(String.class);
+            LOG.info("Aggregating {}", name);
+            return newExchange;
+        }
+    }
+}
diff --git a/components/camel-zipfile/src/test/resources/log4j2.properties 
b/components/camel-zipfile/src/test/resources/log4j2.properties
index 32fa9311108..55263e9ebcb 100644
--- a/components/camel-zipfile/src/test/resources/log4j2.properties
+++ b/components/camel-zipfile/src/test/resources/log4j2.properties
@@ -25,4 +25,4 @@ appender.out.name = out
 appender.out.layout.type = PatternLayout
 appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
 rootLogger.level = INFO
-rootLogger.appenderRef.file.ref = file
+rootLogger.appenderRef.file.ref = out
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 29dfa26857b..da2d59d1067 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -469,6 +469,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         }
 
         protected void aggregate() {
+            System.out.println("aggregate");
             Lock lock = this.lock;
             if (lock.tryLock()) {
                 try {
@@ -667,9 +668,15 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
                 } catch (Exception e) {
                     original.setException(e);
                     doDone(null, false);
-                    return;
+                    next = false;
                 }
             }
+            // execute any pending tasks
+            //            while (reactiveExecutor.executeFromQueue()) {
+            //                System.out.println("pending work");
+            //            }
+            // fail-safe to ensure we trigger done
+            doDone(null, false);
         }
 
         boolean doRun() throws Exception {
@@ -687,8 +694,11 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
             }
 
             ProcessorExchangePair pair = iterator.next();
-            boolean hasNext = iterator.hasNext();
+            if (pair == null) {
+                return true;
+            }
 
+            boolean hasNext = iterator.hasNext();
             Exchange exchange = pair.getExchange();
             int index = nbExchangeSent.getAndIncrement();
             updateNewExchange(exchange, index, pairs, hasNext);

Reply via email to