This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit a0a5b39c9196ff7ec583b0a759d5119078f96d15 Author: Claus Ibsen <[email protected]> AuthorDate: Mon Jan 27 12:42:02 2020 +0100 CAMEL-14354: camel-core optimize --- .../src/main/java/org/apache/camel/Exchange.java | 1 + .../java/org/apache/camel/ExtendedExchange.java | 10 +++++ .../engine/DefaultAsyncProcessorAwaitManager.java | 2 +- .../errorhandler/RedeliveryErrorHandler.java | 46 ++++++++++------------ .../org/apache/camel/support/DefaultExchange.java | 13 +++++- .../org/apache/camel/support/ExchangeHelper.java | 11 ------ 6 files changed, 44 insertions(+), 39 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java b/core/camel-api/src/main/java/org/apache/camel/Exchange.java index 0c4da07..364722f 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java @@ -166,6 +166,7 @@ public interface Exchange { String INTERCEPTED_ENDPOINT = "CamelInterceptedEndpoint"; String INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED = "CamelInterceptSendToEndpointWhenMatched"; + @Deprecated String INTERRUPTED = "CamelInterrupted"; String LANGUAGE_SCRIPT = "CamelLanguageScript"; diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java index 840cc93..e3681e9 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java @@ -108,4 +108,14 @@ public interface ExtendedExchange extends Exchange { */ void setNotifyEvent(boolean notifyEvent); + /** + * Whether the exchange was interrupted (InterruptException) during routing. + */ + boolean isInterrupted(); + + /** + * Used to signal that this exchange was interrupted (InterruptException) during routing. + */ + void setInterrupted(boolean interrupted); + } diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java index fb67842..fade8a4 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java @@ -191,7 +191,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements interruptedCounter.incrementAndGet(); } exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId())); - exchange.setProperty(Exchange.INTERRUPTED, Boolean.TRUE); + exchange.adapt(ExtendedExchange.class).setInterrupted(true); entry.getLatch().countDown(); } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index 30d2eab..ca010ff 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -30,6 +30,7 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.ExtendedExchange; import org.apache.camel.LoggingLevel; import org.apache.camel.Message; import org.apache.camel.Navigate; @@ -276,35 +277,26 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme * Strategy to determine if the exchange is done so we can continue */ protected boolean isDone(Exchange exchange) { - boolean answer = isCancelledOrInterrupted(exchange); + if (((ExtendedExchange) exchange).isInterrupted()) { + // mark the exchange to stop continue routing when interrupted + // as we do not want to continue routing (for example a task has been cancelled) + if (LOG.isTraceEnabled()) { + LOG.trace("Is exchangeId: {} interrupted? true", exchange.getExchangeId()); + } + exchange.setRouteStop(true); + return true; + } // only done if the exchange hasn't failed // and it has not been handled by the failure processor // or we are exhausted - if (!answer) { - answer = exchange.getException() == null - || ExchangeHelper.isFailureHandled(exchange) - || ExchangeHelper.isRedeliveryExhausted(exchange); - } - - LOG.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer); - return answer; - } + boolean answer = exchange.getException() == null + || ExchangeHelper.isFailureHandled(exchange) + || ExchangeHelper.isRedeliveryExhausted(exchange); - /** - * Strategy to determine if the exchange was cancelled or interrupted - */ - protected boolean isCancelledOrInterrupted(Exchange exchange) { - boolean answer = false; - - if (ExchangeHelper.isInterrupted(exchange)) { - // mark the exchange to stop continue routing when interrupted - // as we do not want to continue routing (for example a task has been cancelled) - exchange.setRouteStop(true); - answer = true; + if (LOG.isTraceEnabled()) { + LOG.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer); } - - LOG.trace("Is exchangeId: {} interrupted? {}", exchange.getExchangeId(), answer); return answer; } @@ -386,14 +378,14 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the // original Exchange is being redelivered, and not a mutated Exchange - this.original = defensiveCopyExchangeIfNeeded(exchange); + this.original = redeliveryEnabled ? defensiveCopyExchangeIfNeeded(exchange) : null; this.exchange = exchange; this.callback = callback; } @Override public String toString() { - return "Step[" + exchange.getExchangeId() + "," + RedeliveryErrorHandler.this + "]"; + return "RedeliveryState"; } /** @@ -564,7 +556,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // process the exchange (also redelivery) outputAsync.process(exchange, doneSync -> { - LOG.trace("Redelivering exchangeId: {}", exchange.getExchangeId()); + if (LOG.isTraceEnabled()) { + LOG.trace("Redelivering exchangeId: {}", exchange.getExchangeId()); + } // only process if the exchange hasn't failed // and it has not been handled by the error processor diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java index e78e59b..3961ec7 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java @@ -62,6 +62,7 @@ public final class DefaultExchange implements ExtendedExchange { private boolean rollbackOnly; private boolean rollbackOnlyLast; private boolean notifyEvent; + private boolean interrupted; public DefaultExchange(CamelContext context) { this(context, ExchangePattern.InOnly); @@ -411,7 +412,7 @@ public final class DefaultExchange implements ExtendedExchange { } if (t instanceof InterruptedException) { // mark the exchange as interrupted due to the interrupt exception - setProperty(Exchange.INTERRUPTED, Boolean.TRUE); + setInterrupted(true); } } @@ -628,6 +629,16 @@ public final class DefaultExchange implements ExtendedExchange { this.notifyEvent = notifyEvent; } + @Override + public boolean isInterrupted() { + return interrupted; + } + + @Override + public void setInterrupted(boolean interrupted) { + this.interrupted = interrupted; + } + /** * Configures the message after it has been set on the exchange */ diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java index 5f6b938..46a9105 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java @@ -603,17 +603,6 @@ public final class ExchangeHelper { } /** - * Checks whether the exchange {@link UnitOfWork} has been interrupted during processing - * - * @param exchange the exchange - * @return <tt>true</tt> if interrupted, <tt>false</tt> otherwise - */ - public static boolean isInterrupted(Exchange exchange) { - Object value = exchange.getProperty(Exchange.INTERRUPTED); - return value != null && Boolean.TRUE == value; - } - - /** * Check whether or not stream caching is enabled for the given route or globally. * * @param exchange the exchange
