This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 17fa7056b88 Fix multicast iterator to be lazy again (#14169) 17fa7056b88 is described below commit 17fa7056b886f29b890fb75c34e321f9ac5f2d0e Author: Guillaume Nodet <gno...@gmail.com> AuthorDate: Fri May 17 08:22:13 2024 +0200 Fix multicast iterator to be lazy again (#14169) If the last element of the iterable is null, the last message will not contain the final informations such as splitter-size, multicast-complete --- .../apache/camel/processor/MulticastProcessor.java | 22 ++++++++++++++-------- .../java/org/apache/camel/processor/Splitter.java | 2 +- ...rParallelWithIteratorThrowingExceptionTest.java | 4 ++-- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 4f1fe2bdddb..78560ad781a 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -70,7 +70,6 @@ import org.apache.camel.support.PatternHelper; import org.apache.camel.support.PluginHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.CastUtils; -import org.apache.camel.util.FilterIterator; import org.apache.camel.util.IOHelper; import org.apache.camel.util.StopWatch; import org.apache.camel.util.concurrent.AsyncCompletionService; @@ -313,6 +312,8 @@ public class MulticastProcessor extends AsyncProcessorSupport try { pairs = createProcessorExchangePairs(exchange); if (pairs instanceof Collection) { + pairs = ((Collection<ProcessorExchangePair>) pairs) + .stream().filter(Objects::nonNull).toList(); size = ((Collection<ProcessorExchangePair>) pairs).size(); } } catch (Exception e) { @@ -409,7 +410,7 @@ public class MulticastProcessor extends AsyncProcessorSupport this.original = original; this.pairs = pairs; this.callback = callback; - this.iterator = new FilterIterator<>(pairs.iterator(), Objects::nonNull); + this.iterator = pairs.iterator(); if (timeout > 0) { timeoutTask = schedule(aggregateExecutorService, this::timeout, timeout, TimeUnit.MILLISECONDS); } else { @@ -535,16 +536,13 @@ public class MulticastProcessor extends AsyncProcessorSupport return; } - // Check if the iterator is empty - // This can happen the very first time we check the existence - // of an item before queuing the run. - // or some iterators may return true for hasNext() but then null in next() - if (!iterator.hasNext()) { + // Get next processor exchange pair to sent, skipping null ones + ProcessorExchangePair pair = getNextProcessorExchangePair(); + if (pair == null) { doDone(result.get(), true); return; } - ProcessorExchangePair pair = iterator.next(); boolean hasNext = iterator.hasNext(); Exchange exchange = pair.getExchange(); @@ -605,6 +603,14 @@ public class MulticastProcessor extends AsyncProcessorSupport doDone(null, false); } } + + private ProcessorExchangePair getNextProcessorExchangePair() { + ProcessorExchangePair tpair = null; + while (tpair == null && iterator.hasNext()) { + tpair = iterator.next(); + } + return tpair; + } } /** diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java index 72c557ccbe2..0b513472ee0 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java @@ -295,7 +295,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs, boolean hasNext) { exchange.setProperty(ExchangePropertyKey.SPLIT_INDEX, index); if (allPairs instanceof Collection) { - // non streaming mode, so we know the total size already + // non-streaming mode, so we know the total size already exchange.setProperty(ExchangePropertyKey.SPLIT_SIZE, ((Collection<?>) allPairs).size()); } if (hasNext) { diff --git a/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java index ccd568d9572..517d52bc459 100644 --- a/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithIteratorThrowingExceptionTest.java @@ -49,7 +49,7 @@ public class SplitterParallelWithIteratorThrowingExceptionTest extends ContextTe @Test public void testIteratorThrowExceptionOnSecond() throws Exception { - getMockEndpoint("mock:line").expectedMessageCount(0); + getMockEndpoint("mock:line").expectedMessageCount(1); getMockEndpoint("mock:end").expectedMessageCount(0); try { @@ -65,7 +65,7 @@ public class SplitterParallelWithIteratorThrowingExceptionTest extends ContextTe @Test public void testIteratorThrowExceptionOnThird() throws Exception { - getMockEndpoint("mock:line").expectedMessageCount(1); + getMockEndpoint("mock:line").expectedMessageCount(2); getMockEndpoint("mock:end").expectedMessageCount(0); try {