This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 17fa7056b88 Fix multicast iterator to be lazy again (#14169)
17fa7056b88 is described below

commit 17fa7056b886f29b890fb75c34e321f9ac5f2d0e
Author: Guillaume Nodet <gno...@gmail.com>
AuthorDate: Fri May 17 08:22:13 2024 +0200

    Fix multicast iterator to be lazy again (#14169)
    
    If the last element of the iterable is null, the last message will not 
contain the final informations such as splitter-size, multicast-complete
---
 .../apache/camel/processor/MulticastProcessor.java | 22 ++++++++++++++--------
 .../java/org/apache/camel/processor/Splitter.java  |  2 +-
 ...rParallelWithIteratorThrowingExceptionTest.java |  4 ++--
 3 files changed, 17 insertions(+), 11 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 4f1fe2bdddb..78560ad781a 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -70,7 +70,6 @@ import org.apache.camel.support.PatternHelper;
 import org.apache.camel.support.PluginHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.CastUtils;
-import org.apache.camel.util.FilterIterator;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.concurrent.AsyncCompletionService;
@@ -313,6 +312,8 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         try {
             pairs = createProcessorExchangePairs(exchange);
             if (pairs instanceof Collection) {
+                pairs = ((Collection<ProcessorExchangePair>) pairs)
+                        .stream().filter(Objects::nonNull).toList();
                 size = ((Collection<ProcessorExchangePair>) pairs).size();
             }
         } catch (Exception e) {
@@ -409,7 +410,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
             this.original = original;
             this.pairs = pairs;
             this.callback = callback;
-            this.iterator = new FilterIterator<>(pairs.iterator(), 
Objects::nonNull);
+            this.iterator = pairs.iterator();
             if (timeout > 0) {
                 timeoutTask = schedule(aggregateExecutorService, 
this::timeout, timeout, TimeUnit.MILLISECONDS);
             } else {
@@ -535,16 +536,13 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
                     return;
                 }
 
-                // Check if the iterator is empty
-                // This can happen the very first time we check the existence
-                // of an item before queuing the run.
-                // or some iterators may return true for hasNext() but then 
null in next()
-                if (!iterator.hasNext()) {
+                // Get next processor exchange pair to sent, skipping null ones
+                ProcessorExchangePair pair = getNextProcessorExchangePair();
+                if (pair == null) {
                     doDone(result.get(), true);
                     return;
                 }
 
-                ProcessorExchangePair pair = iterator.next();
                 boolean hasNext = iterator.hasNext();
 
                 Exchange exchange = pair.getExchange();
@@ -605,6 +603,14 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
                 doDone(null, false);
             }
         }
+
+        private ProcessorExchangePair getNextProcessorExchangePair() {
+            ProcessorExchangePair tpair = null;
+            while (tpair == null && iterator.hasNext()) {
+                tpair = iterator.next();
+            }
+            return tpair;
+        }
     }
 
     /**
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index 72c557ccbe2..0b513472ee0 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -295,7 +295,7 @@ public class Splitter extends MulticastProcessor implements 
AsyncProcessor, Trac
     protected void updateNewExchange(Exchange exchange, int index, 
Iterable<ProcessorExchangePair> allPairs, boolean hasNext) {
         exchange.setProperty(ExchangePropertyKey.SPLIT_INDEX, index);
         if (allPairs instanceof Collection) {
-            // non streaming mode, so we know the total size already
+            // non-streaming mode, so we know the total size already
             exchange.setProperty(ExchangePropertyKey.SPLIT_SIZE, 
((Collection<?>) allPairs).size());
         }
         if (hasNext) {
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java
index ccd568d9572..517d52bc459 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java
@@ -49,7 +49,7 @@ public class 
SplitterParallelWithIteratorThrowingExceptionTest extends ContextTe
 
     @Test
     public void testIteratorThrowExceptionOnSecond() throws Exception {
-        getMockEndpoint("mock:line").expectedMessageCount(0);
+        getMockEndpoint("mock:line").expectedMessageCount(1);
         getMockEndpoint("mock:end").expectedMessageCount(0);
 
         try {
@@ -65,7 +65,7 @@ public class 
SplitterParallelWithIteratorThrowingExceptionTest extends ContextTe
 
     @Test
     public void testIteratorThrowExceptionOnThird() throws Exception {
-        getMockEndpoint("mock:line").expectedMessageCount(1);
+        getMockEndpoint("mock:line").expectedMessageCount(2);
         getMockEndpoint("mock:end").expectedMessageCount(0);
 
         try {

Reply via email to