This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b19ba0569f398f2a0f341834f957401fef8cc743
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Apr 5 16:47:06 2021 +0200

    CAMEL-16451: camel-core - ExchangePooling for EIPs. Wiretap EIP
---
 .../engine/PooledProcessorExchangeFactory.java     | 119 +++++++++++++++++++++
 .../apache/camel/processor/WireTapProcessor.java   |   2 +-
 .../camel/main/DefaultConfigurationConfigurer.java |   6 ++
 3 files changed, 126 insertions(+), 1 deletion(-)

diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
new file mode 100644
index 0000000..d3c37a6
--- /dev/null
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.PooledExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.ProcessorExchangeFactory;
+import org.apache.camel.support.DefaultPooledExchange;
+import org.apache.camel.support.ExchangeHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Pooled {@link org.apache.camel.spi.ProcessorExchangeFactory} that reuses 
{@link Exchange} instance from a pool.
+ */
+public class PooledProcessorExchangeFactory extends 
PrototypeProcessorExchangeFactory {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PooledProcessorExchangeFactory.class);
+
+    public PooledProcessorExchangeFactory() {
+    }
+
+    public PooledProcessorExchangeFactory(Processor processor) {
+        super(processor);
+    }
+
+    @Override
+    public boolean isPooled() {
+        return true;
+    }
+
+    @Override
+    public ProcessorExchangeFactory newProcessorExchangeFactory(Processor 
processor) {
+        PooledProcessorExchangeFactory answer = new 
PooledProcessorExchangeFactory(processor);
+        answer.setStatisticsEnabled(statisticsEnabled);
+        answer.setCapacity(capacity);
+        answer.setCamelContext(camelContext);
+        return answer;
+    }
+
+    @Override
+    public Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
+        Exchange answer = pool.poll();
+        if (answer == null) {
+            // create a new exchange as there was no free from the pool
+            PooledExchange pe = new DefaultPooledExchange(exchange);
+            ExchangeHelper.copyResults(pe, exchange);
+            // do not reuse message id on copy
+            pe.getIn().setMessageId(null);
+            // do not share the unit of work
+            if (pe.getUnitOfWork() != null) {
+                pe.getUnitOfWork().reset();
+            }
+            if (handover) {
+                // Need to hand over the completion for async invocation
+                pe.adapt(ExtendedExchange.class).handoverCompletions(exchange);
+            }
+            // set a correlation id so we can track back the original exchange
+            pe.setProperty(ExchangePropertyKey.CORRELATION_ID, 
exchange.getExchangeId());
+            if (statisticsEnabled) {
+                statistics.created.increment();
+            }
+            answer = pe;
+        } else {
+            if (statisticsEnabled) {
+                statistics.acquired.increment();
+            }
+            // reset exchange for reuse
+            PooledExchange ee = (PooledExchange) answer;
+            ee.reset(System.currentTimeMillis());
+        }
+        return answer;
+    }
+
+    @Override
+    public boolean release(Exchange exchange) {
+        try {
+            // done exchange before returning back to pool
+            PooledExchange ee = (PooledExchange) exchange;
+            ee.done(true);
+
+            // only release back in pool if reset was success
+            boolean inserted = pool.offer(exchange);
+
+            if (statisticsEnabled) {
+                if (inserted) {
+                    statistics.released.increment();
+                } else {
+                    statistics.discarded.increment();
+                }
+            }
+            return inserted;
+        } catch (Exception e) {
+            if (statisticsEnabled) {
+                statistics.discarded.increment();
+            }
+            LOG.debug("Error resetting exchange: {}. This exchange is 
discarded.", exchange);
+            return false;
+        }
+    }
+
+}
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 8cf889b..fff7cb9 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -102,10 +102,10 @@ public class WireTapProcessor extends 
AsyncProcessorSupport
             @Override
             public void done(boolean doneSync) {
                 taskCount.decrement();
-                taskFactory.release(WireTapTask.this);
                 if (processorExchangeFactory != null) {
                     processorExchangeFactory.release(exchange);
                 }
+                taskFactory.release(WireTapTask.this);
             }
         };
 
diff --git 
a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
 
b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
index 4af36cf..f5e1038 100644
--- 
a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
+++ 
b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
@@ -32,7 +32,9 @@ import org.apache.camel.health.HealthCheckRegistry;
 import org.apache.camel.health.HealthCheckRepository;
 import org.apache.camel.impl.debugger.BacklogTracer;
 import org.apache.camel.impl.engine.PooledExchangeFactory;
+import org.apache.camel.impl.engine.PooledProcessorExchangeFactory;
 import org.apache.camel.impl.engine.PrototypeExchangeFactory;
+import org.apache.camel.impl.engine.PrototypeProcessorExchangeFactory;
 import org.apache.camel.model.Model;
 import org.apache.camel.model.ModelCamelContext;
 import org.apache.camel.model.ModelLifecycleStrategy;
@@ -129,11 +131,15 @@ public final class DefaultConfigurationConfigurer {
 
         if ("pooled".equals(config.getExchangeFactory())) {
             ecc.setExchangeFactory(new PooledExchangeFactory());
+            ecc.setProcessorExchangeFactory(new 
PooledProcessorExchangeFactory());
         } else if ("prototype".equals(config.getExchangeFactory())) {
             ecc.setExchangeFactory(new PrototypeExchangeFactory());
+            ecc.setProcessorExchangeFactory(new 
PrototypeProcessorExchangeFactory());
         }
         
ecc.getExchangeFactory().setCapacity(config.getExchangeFactoryCapacity());
+        
ecc.getProcessorExchangeFactory().setCapacity(config.getExchangeFactoryCapacity());
         
ecc.getExchangeFactory().setStatisticsEnabled(config.isExchangeFactoryStatisticsEnabled());
+        
ecc.getProcessorExchangeFactory().setStatisticsEnabled(config.isExchangeFactoryStatisticsEnabled());
 
         if (!config.isJmxEnabled()) {
             camelContext.disableJMX();

Reply via email to