This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit b19ba0569f398f2a0f341834f957401fef8cc743 Author: Claus Ibsen <[email protected]> AuthorDate: Mon Apr 5 16:47:06 2021 +0200 CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP --- .../engine/PooledProcessorExchangeFactory.java | 119 +++++++++++++++++++++ .../apache/camel/processor/WireTapProcessor.java | 2 +- .../camel/main/DefaultConfigurationConfigurer.java | 6 ++ 3 files changed, 126 insertions(+), 1 deletion(-) diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java new file mode 100644 index 0000000..d3c37a6 --- /dev/null +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.engine; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; +import org.apache.camel.ExtendedExchange; +import org.apache.camel.PooledExchange; +import org.apache.camel.Processor; +import org.apache.camel.spi.ProcessorExchangeFactory; +import org.apache.camel.support.DefaultPooledExchange; +import org.apache.camel.support.ExchangeHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Pooled {@link org.apache.camel.spi.ProcessorExchangeFactory} that reuses {@link Exchange} instance from a pool. + */ +public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFactory { + + private static final Logger LOG = LoggerFactory.getLogger(PooledProcessorExchangeFactory.class); + + public PooledProcessorExchangeFactory() { + } + + public PooledProcessorExchangeFactory(Processor processor) { + super(processor); + } + + @Override + public boolean isPooled() { + return true; + } + + @Override + public ProcessorExchangeFactory newProcessorExchangeFactory(Processor processor) { + PooledProcessorExchangeFactory answer = new PooledProcessorExchangeFactory(processor); + answer.setStatisticsEnabled(statisticsEnabled); + answer.setCapacity(capacity); + answer.setCamelContext(camelContext); + return answer; + } + + @Override + public Exchange createCorrelatedCopy(Exchange exchange, boolean handover) { + Exchange answer = pool.poll(); + if (answer == null) { + // create a new exchange as there was no free from the pool + PooledExchange pe = new DefaultPooledExchange(exchange); + ExchangeHelper.copyResults(pe, exchange); + // do not reuse message id on copy + pe.getIn().setMessageId(null); + // do not share the unit of work + if (pe.getUnitOfWork() != null) { + pe.getUnitOfWork().reset(); + } + if (handover) { + // Need to hand over the completion for async invocation + pe.adapt(ExtendedExchange.class).handoverCompletions(exchange); + } + // set a correlation id so we can track back the original exchange + pe.setProperty(ExchangePropertyKey.CORRELATION_ID, exchange.getExchangeId()); + if (statisticsEnabled) { + statistics.created.increment(); + } + answer = pe; + } else { + if (statisticsEnabled) { + statistics.acquired.increment(); + } + // reset exchange for reuse + PooledExchange ee = (PooledExchange) answer; + ee.reset(System.currentTimeMillis()); + } + return answer; + } + + @Override + public boolean release(Exchange exchange) { + try { + // done exchange before returning back to pool + PooledExchange ee = (PooledExchange) exchange; + ee.done(true); + + // only release back in pool if reset was success + boolean inserted = pool.offer(exchange); + + if (statisticsEnabled) { + if (inserted) { + statistics.released.increment(); + } else { + statistics.discarded.increment(); + } + } + return inserted; + } catch (Exception e) { + if (statisticsEnabled) { + statistics.discarded.increment(); + } + LOG.debug("Error resetting exchange: {}. This exchange is discarded.", exchange); + return false; + } + } + +} diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java index 8cf889b..fff7cb9 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -102,10 +102,10 @@ public class WireTapProcessor extends AsyncProcessorSupport @Override public void done(boolean doneSync) { taskCount.decrement(); - taskFactory.release(WireTapTask.this); if (processorExchangeFactory != null) { processorExchangeFactory.release(exchange); } + taskFactory.release(WireTapTask.this); } }; diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java index 4af36cf..f5e1038 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java @@ -32,7 +32,9 @@ import org.apache.camel.health.HealthCheckRegistry; import org.apache.camel.health.HealthCheckRepository; import org.apache.camel.impl.debugger.BacklogTracer; import org.apache.camel.impl.engine.PooledExchangeFactory; +import org.apache.camel.impl.engine.PooledProcessorExchangeFactory; import org.apache.camel.impl.engine.PrototypeExchangeFactory; +import org.apache.camel.impl.engine.PrototypeProcessorExchangeFactory; import org.apache.camel.model.Model; import org.apache.camel.model.ModelCamelContext; import org.apache.camel.model.ModelLifecycleStrategy; @@ -129,11 +131,15 @@ public final class DefaultConfigurationConfigurer { if ("pooled".equals(config.getExchangeFactory())) { ecc.setExchangeFactory(new PooledExchangeFactory()); + ecc.setProcessorExchangeFactory(new PooledProcessorExchangeFactory()); } else if ("prototype".equals(config.getExchangeFactory())) { ecc.setExchangeFactory(new PrototypeExchangeFactory()); + ecc.setProcessorExchangeFactory(new PrototypeProcessorExchangeFactory()); } ecc.getExchangeFactory().setCapacity(config.getExchangeFactoryCapacity()); + ecc.getProcessorExchangeFactory().setCapacity(config.getExchangeFactoryCapacity()); ecc.getExchangeFactory().setStatisticsEnabled(config.isExchangeFactoryStatisticsEnabled()); + ecc.getProcessorExchangeFactory().setStatisticsEnabled(config.isExchangeFactoryStatisticsEnabled()); if (!config.isJmxEnabled()) { camelContext.disableJMX();
