This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit f3eca85d24736dae6afd0c6cf1b57bbe85affb71 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Jan 19 11:50:10 2020 +0100 CAMEL-14354: Optimize core --- .../main/java/org/apache/camel/processor/Pipeline.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java index e646a45..8e2f5f3 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java @@ -18,7 +18,6 @@ package org.apache.camel.processor; import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -83,25 +82,26 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo @Override public boolean process(Exchange exchange, AsyncCallback callback) { if (exchange.isTransacted()) { - camelContext.getReactiveExecutor().scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true)); + camelContext.getReactiveExecutor().scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors, 0, true)); } else { - camelContext.getReactiveExecutor().scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true)); + camelContext.getReactiveExecutor().scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors, 0, true)); } return false; } - protected void doProcess(Exchange exchange, AsyncCallback callback, Iterator<AsyncProcessor> processors, boolean first) { - if (continueRouting(processors, exchange) + protected void doProcess(Exchange exchange, AsyncCallback callback, List<AsyncProcessor> processors, int index, boolean first) { + if (continueRouting(processors, index, exchange) && (first || continueProcessing(exchange, "so breaking out of pipeline", log))) { // prepare for next run ExchangeHelper.prepareOutToIn(exchange); // get the next processor - AsyncProcessor processor = processors.next(); + AsyncProcessor processor = processors.get(index); + final Integer idx = index + 1; processor.process(exchange, doneSync -> - camelContext.getReactiveExecutor().schedule(() -> doProcess(exchange, callback, processors, false))); + camelContext.getReactiveExecutor().schedule(() -> doProcess(exchange, callback, processors, idx, false))); } else { ExchangeHelper.copyResults(exchange, exchange); @@ -114,7 +114,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo } } - protected boolean continueRouting(Iterator<AsyncProcessor> it, Exchange exchange) { + protected boolean continueRouting(List<AsyncProcessor> processors, int index, Exchange exchange) { Object stop = exchange.getProperty(Exchange.ROUTE_STOP); if (stop != null) { boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop); @@ -124,7 +124,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo } } // continue if there are more processors to route - boolean answer = it.hasNext(); + boolean answer = index < processors.size(); log.trace("ExchangeId: {} should continue routing: {}", exchange.getExchangeId(), answer); return answer; }