This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 865f05a1500b9a3890bcbe5fc1cea9cef58d578f Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Fri Jan 31 11:26:15 2025 +0100 (chores) camel-core: refactor large methods to help inlining --- .../org/apache/camel/processor/LoopProcessor.java | 26 ++-- .../org/apache/camel/processor/SendProcessor.java | 172 +++++++++++---------- .../apache/camel/processor/ThreadsProcessor.java | 24 +-- .../errorhandler/RedeliveryErrorHandler.java | 84 +++++----- 4 files changed, 166 insertions(+), 140 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java index 776ef59184e..e4e659a6026 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java @@ -170,21 +170,25 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, callback.done(false); } } catch (Exception e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Processing failed for exchangeId: {} >>> {}", exchange.getExchangeId(), e.getMessage()); - } - if (expression != null) { - // if we should stop due to an exception etc, then make sure to dec task count - int gap = count - index; - while (gap-- > 0) { - taskCount.decrement(); - } - } - exchange.setException(e); + handleException(e); callback.done(false); } } + private void handleException(Exception e) { + if (LOG.isTraceEnabled()) { + LOG.trace("Processing failed for exchangeId: {} >>> {}", exchange.getExchangeId(), e.getMessage()); + } + if (expression != null) { + // if we should stop due to an exception etc, then make sure to dec task count + int gap = count - index; + while (gap-- > 0) { + taskCount.decrement(); + } + } + exchange.setException(e); + } + @Override public String toString() { return "LoopState"; diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java index 37dd575fccd..77d56b66b7c 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java @@ -154,76 +154,94 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E // if we have a producer then use that as its optimized if (producer != null) { - final Exchange target = exchange; + return sendUsingProducer(exchange, callback, existingPattern, originalBody, originalHeaders); + } else { // we can send with a different MEP pattern - if (destinationExchangePattern != null || pattern != null) { - target.setPattern(destinationExchangePattern != null ? destinationExchangePattern : pattern); - } - // set property which endpoint we send to - exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, destination.getEndpointUri()); - - final boolean sending = camelContext.getCamelContextExtension().isEventNotificationApplicable() - && EventHelper.notifyExchangeSending(exchange.getContext(), target, destination); - // record timing for sending the exchange using the producer - StopWatch watch; - if (sending) { - watch = new StopWatch(); - } else { - watch = null; - } + return sendUsingPattern(exchange, callback, existingPattern, originalBody, originalHeaders); + } + } - // optimize to only create a new callback if really needed, otherwise we can use the provided callback as-is - AsyncCallback ac = callback; - boolean newCallback = watch != null || existingPattern != target.getPattern() || variableReceive != null; - if (newCallback) { - ac = doneSync -> { - try { - // result should be stored in variable instead of message body/headers - if (ExchangeHelper.shouldSetVariableResult(target, variableReceive)) { - ExchangeHelper.setVariableFromMessageBodyAndHeaders(target, variableReceive, - target.getMessage()); - target.getMessage().setBody(originalBody); - target.getMessage().setHeaders(originalHeaders); - } - // restore previous MEP - target.setPattern(existingPattern); - // emit event that the exchange was sent to the endpoint - if (watch != null) { - long timeTaken = watch.taken(); - EventHelper.notifyExchangeSent(target.getContext(), target, destination, timeTaken); - } - } finally { - callback.done(doneSync); - } - }; - } - try { - // replace message body with variable - if (variableSend != null) { - Object value = ExchangeHelper.getVariable(exchange, variableSend); - exchange.getMessage().setBody(value); - } + private boolean sendUsingPattern( + Exchange exchange, AsyncCallback callback, ExchangePattern existingPattern, Object originalBody, + Map<String, Object> originalHeaders) { + if (destinationExchangePattern != null || pattern != null) { + exchange.setPattern(destinationExchangePattern != null ? destinationExchangePattern : pattern); + } + // set property which endpoint we send to + exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, destination.getEndpointUri()); - LOG.debug(">>>> {} {}", destination, exchange); - boolean sync = producer.process(exchange, ac); - if (!sync) { - EventHelper.notifyExchangeAsyncProcessingStartedEvent(camelContext, exchange); - } - return sync; - } catch (Exception throwable) { - exchange.setException(throwable); - callback.done(true); - } + // replace message body with variable + if (variableSend != null) { + Object value = ExchangeHelper.getVariable(exchange, variableSend); + exchange.getMessage().setBody(value); + } - return true; + LOG.debug(">>>> {} {}", destination, exchange); + + // send the exchange to the destination using the producer cache for the non optimized producers + return producerCache.doInAsyncProducer(destination, exchange, callback, + (producer, ex, cb) -> producer.process(ex, doneSync -> { + // restore previous MEP + exchange.setPattern(existingPattern); + // result should be stored in variable instead of message body/headers + if (ExchangeHelper.shouldSetVariableResult(exchange, variableReceive)) { + ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive, + exchange.getMessage()); + exchange.getMessage().setBody(originalBody); + exchange.getMessage().setHeaders(originalHeaders); + } + // signal we are done + cb.done(doneSync); + })); + } + + private boolean sendUsingProducer( + Exchange exchange, AsyncCallback callback, ExchangePattern existingPattern, Object originalBody, + Map<String, Object> originalHeaders) { + final Exchange target = exchange; + // we can send with a different MEP pattern + if (destinationExchangePattern != null || pattern != null) { + target.setPattern(destinationExchangePattern != null ? destinationExchangePattern : pattern); + } + // set property which endpoint we send to + exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, destination.getEndpointUri()); + + final boolean sending = camelContext.getCamelContextExtension().isEventNotificationApplicable() + && EventHelper.notifyExchangeSending(exchange.getContext(), target, destination); + // record timing for sending the exchange using the producer + StopWatch watch; + if (sending) { + watch = new StopWatch(); } else { - // we can send with a different MEP pattern - if (destinationExchangePattern != null || pattern != null) { - exchange.setPattern(destinationExchangePattern != null ? destinationExchangePattern : pattern); - } - // set property which endpoint we send to - exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, destination.getEndpointUri()); + watch = null; + } + // optimize to only create a new callback if really needed, otherwise we can use the provided callback as-is + AsyncCallback ac = callback; + boolean newCallback = watch != null || existingPattern != target.getPattern() || variableReceive != null; + if (newCallback) { + ac = doneSync -> { + try { + // result should be stored in variable instead of message body/headers + if (ExchangeHelper.shouldSetVariableResult(target, variableReceive)) { + ExchangeHelper.setVariableFromMessageBodyAndHeaders(target, variableReceive, + target.getMessage()); + target.getMessage().setBody(originalBody); + target.getMessage().setHeaders(originalHeaders); + } + // restore previous MEP + target.setPattern(existingPattern); + // emit event that the exchange was sent to the endpoint + if (watch != null) { + long timeTaken = watch.taken(); + EventHelper.notifyExchangeSent(target.getContext(), target, destination, timeTaken); + } + } finally { + callback.done(doneSync); + } + }; + } + try { // replace message body with variable if (variableSend != null) { Object value = ExchangeHelper.getVariable(exchange, variableSend); @@ -231,23 +249,17 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E } LOG.debug(">>>> {} {}", destination, exchange); - - // send the exchange to the destination using the producer cache for the non optimized producers - return producerCache.doInAsyncProducer(destination, exchange, callback, - (producer, ex, cb) -> producer.process(ex, doneSync -> { - // restore previous MEP - exchange.setPattern(existingPattern); - // result should be stored in variable instead of message body/headers - if (ExchangeHelper.shouldSetVariableResult(exchange, variableReceive)) { - ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive, - exchange.getMessage()); - exchange.getMessage().setBody(originalBody); - exchange.getMessage().setHeaders(originalHeaders); - } - // signal we are done - cb.done(doneSync); - })); + boolean sync = producer.process(exchange, ac); + if (!sync) { + EventHelper.notifyExchangeAsyncProcessingStartedEvent(camelContext, exchange); + } + return sync; + } catch (Exception throwable) { + exchange.setException(throwable); + callback.done(true); } + + return true; } public String getVariableSend() { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/ThreadsProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/ThreadsProcessor.java index b9e2ddaa63b..48a0f109cba 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/ThreadsProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/ThreadsProcessor.java @@ -133,16 +133,20 @@ public class ThreadsProcessor extends AsyncProcessorSupport implements IdAware, // tell Camel routing engine we continue routing asynchronous return false; } catch (Exception e) { - if (executorService instanceof ThreadPoolExecutor tpe) { - // process the call in synchronous mode - ProcessCall call = new ProcessCall(exchange, callback, true); - rejectedPolicy.asRejectedExecutionHandler().rejectedExecution(call, tpe); - return true; - } else { - exchange.setException(e); - callback.done(true); - return true; - } + return handleException(exchange, callback, e); + } + } + + private boolean handleException(Exchange exchange, AsyncCallback callback, Exception e) { + if (executorService instanceof ThreadPoolExecutor tpe) { + // process the call in synchronous mode + ProcessCall call = new ProcessCall(exchange, callback, true); + rejectedPolicy.asRejectedExecutionHandler().rejectedExecution(call, tpe); + return true; + } else { + exchange.setException(e); + callback.done(true); + return true; } } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index 164958fb736..66f2af4edb4 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -756,48 +756,11 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport // okay there is a delay so create a scheduled task to have it executed in the future if (currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) { - - // we are doing a redelivery then a thread pool must be configured (see the doStart method) - ObjectHelper.notNull(executorService, - "Redelivery is enabled but ExecutorService has not been configured.", this); - - // schedule the redelivery task - if (LOG.isTraceEnabled()) { - LOG.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", redeliveryDelay, - exchange.getExchangeId()); - } - executorService.schedule(() -> reactiveExecutor.schedule(this::redeliver), redeliveryDelay, - TimeUnit.MILLISECONDS); - + runAsynchronousRedelivery(); } else { // async delayed redelivery was disabled or we are transacted so we must be synchronous // as the transaction manager requires to execute in the same thread context - try { - // we are doing synchronous redelivery and use thread sleep, so we keep track using a counter how many are sleeping - redeliverySleepCounter.incrementAndGet(); - boolean complete = sleep(); - redeliverySleepCounter.decrementAndGet(); - if (!complete) { - // the task was rejected - exchange.setException(new RejectedExecutionException("Redelivery not allowed while stopping")); - // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange - exchange.getExchangeExtension().setRedeliveryExhausted(true); - // jump to start of loop which then detects that we are failed and exhausted - reactiveExecutor.schedule(this); - } else { - reactiveExecutor.schedule(this::redeliver); - } - } catch (InterruptedException e) { - redeliverySleepCounter.decrementAndGet(); - // we was interrupted so break out - exchange.setException(e); - // mark the exchange to stop continue routing when interrupted - // as we do not want to continue routing (for example a task has been cancelled) - exchange.setRouteStop(true); - reactiveExecutor.schedule(callback); - - Thread.currentThread().interrupt(); - } + runSynchronousRedelivery(); } } else { // execute the task immediately @@ -819,6 +782,49 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport } } + private void runAsynchronousRedelivery() { + // we are doing a redelivery then a thread pool must be configured (see the doStart method) + ObjectHelper.notNull(executorService, + "Redelivery is enabled but ExecutorService has not been configured.", this); + + // schedule the redelivery task + if (LOG.isTraceEnabled()) { + LOG.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", redeliveryDelay, + exchange.getExchangeId()); + } + executorService.schedule(() -> reactiveExecutor.schedule(this::redeliver), redeliveryDelay, + TimeUnit.MILLISECONDS); + } + + private void runSynchronousRedelivery() { + try { + // we are doing synchronous redelivery and use thread sleep, so we keep track using a counter how many are sleeping + redeliverySleepCounter.incrementAndGet(); + boolean complete = sleep(); + redeliverySleepCounter.decrementAndGet(); + if (!complete) { + // the task was rejected + exchange.setException(new RejectedExecutionException("Redelivery not allowed while stopping")); + // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange + exchange.getExchangeExtension().setRedeliveryExhausted(true); + // jump to start of loop which then detects that we are failed and exhausted + reactiveExecutor.schedule(this); + } else { + reactiveExecutor.schedule(this::redeliver); + } + } catch (InterruptedException e) { + redeliverySleepCounter.decrementAndGet(); + // we was interrupted so break out + exchange.setException(e); + // mark the exchange to stop continue routing when interrupted + // as we do not want to continue routing (for example a task has been cancelled) + exchange.setRouteStop(true); + reactiveExecutor.schedule(callback); + + Thread.currentThread().interrupt(); + } + } + protected boolean isRunAllowed() { // if camel context is forcing a shutdown then do not allow running if (shutdownStrategy.isForceShutdown()) {
