CAMEL-4596: pollEnrich supports dynamic uris.

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1b4af69c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1b4af69c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1b4af69c

Branch: refs/heads/master
Commit: 1b4af69c1bb2b03057feffc4ddd4e84baa2fba0b
Parents: b5be4d6
Author: Claus Ibsen <[email protected]>
Authored: Mon Jul 13 10:42:20 2015 +0200
Committer: Claus Ibsen <[email protected]>
Committed: Mon Jul 13 10:42:20 2015 +0200

----------------------------------------------------------------------
 .../apache/camel/impl/DefaultCamelContext.java  |  1 +
 .../apache/camel/impl/EmptyConsumerCache.java   | 76 ++++++++++++++++++++
 .../camel/model/PollEnrichDefinition.java       | 24 +++++++
 .../apache/camel/processor/PollEnricher.java    | 21 +++++-
 .../processor/RecipientListNoCacheTest.java     |  2 +-
 .../PollEnrichExpressionNoCacheTest.java        | 47 ++++++++++++
 .../enricher/PollEnrichExpressionTest.java      |  6 +-
 .../camel/spring/processor/pollEnricherRef.xml  |  8 +--
 8 files changed, 177 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 7056932..c24674e 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -2720,6 +2720,7 @@ public class DefaultCamelContext extends ServiceSupport 
implements ModelCamelCon
         // special for executorServiceManager as want to stop it manually
         doAddService(executorServiceManager, false);
         addService(producerServicePool);
+        addService(pollingConsumerServicePool);
         addService(inflightRepository);
         addService(asyncProcessorAwaitManager);
         addService(shutdownStrategy);

http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/main/java/org/apache/camel/impl/EmptyConsumerCache.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/EmptyConsumerCache.java 
b/camel-core/src/main/java/org/apache/camel/impl/EmptyConsumerCache.java
new file mode 100644
index 0000000..219371a
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/EmptyConsumerCache.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.FailedToCreateConsumerException;
+import org.apache.camel.IsSingleton;
+import org.apache.camel.PollingConsumer;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A {@link ConsumerCache} which is always empty and does not cache any {@link 
org.apache.camel.Consumer}s.
+ */
+public class EmptyConsumerCache extends ConsumerCache {
+
+    public EmptyConsumerCache(Object source, CamelContext camelContext) {
+        super(source, camelContext, 0);
+    }
+
+    @Override
+    public PollingConsumer acquirePollingConsumer(Endpoint endpoint) {
+        // always create a new consumer
+        PollingConsumer answer;
+        try {
+            answer = endpoint.createPollingConsumer();
+            boolean singleton = true;
+            if (answer instanceof IsSingleton) {
+                singleton = ((IsSingleton) answer).isSingleton();
+            }
+            if (getCamelContext().isStartingRoutes() && singleton) {
+                // if we are currently starting a route, then add as service 
and enlist in JMX
+                // - but do not enlist non-singletons in JMX
+                // - note addService will also start the service
+                getCamelContext().addService(answer);
+            } else {
+                // must then start service so producer is ready to be used
+                ServiceHelper.startService(answer);
+            }
+        } catch (Exception e) {
+            throw new FailedToCreateConsumerException(endpoint, e);
+        }
+        return answer;
+    }
+
+    @Override
+    public void releasePollingConsumer(Endpoint endpoint, PollingConsumer 
pollingConsumer) {
+        // stop and shutdown the consumer as its not cache or reused
+        try {
+            ServiceHelper.stopAndShutdownService(pollingConsumer);
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "EmptyConsumerCache for source: " + getSource();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index eb1247b..18801d4 100644
--- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -52,6 +52,8 @@ public class PollEnrichDefinition extends 
NoOutputExpressionNode {
     private Boolean aggregateOnException;
     @XmlTransient
     private AggregationStrategy aggregationStrategy;
+    @XmlAttribute
+    private Integer cacheSize;
 
     public PollEnrichDefinition() {
     }
@@ -89,6 +91,9 @@ public class PollEnrichDefinition extends 
NoOutputExpressionNode {
         if (getAggregateOnException() != null) {
             enricher.setAggregateOnException(getAggregateOnException());
         }
+        if (getCacheSize() != null) {
+            enricher.setCacheSize(getCacheSize());
+        }
 
         return enricher;
     }
@@ -186,6 +191,18 @@ public class PollEnrichDefinition extends 
NoOutputExpressionNode {
         return this;
     }
 
+    /**
+     * Sets the maximum size used by the {@link 
org.apache.camel.impl.ConsumerCache} which is used
+     * to cache and reuse consumers when using this pollEnrich, when uris are 
reused.
+     *
+     * @param cacheSize  the cache size, use <tt>0</tt> for default cache 
size, or <tt>-1</tt> to turn cache off.
+     * @return the builder
+     */
+    public PollEnrichDefinition cacheSize(int cacheSize) {
+        setCacheSize(cacheSize);
+        return this;
+    }
+
     // Properties
     // 
-------------------------------------------------------------------------
 
@@ -237,4 +254,11 @@ public class PollEnrichDefinition extends 
NoOutputExpressionNode {
         this.aggregateOnException = aggregateOnException;
     }
 
+    public Integer getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(Integer cacheSize) {
+        this.cacheSize = cacheSize;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java 
b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index 9873cbb..de3c7b4 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -26,6 +26,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.impl.ConsumerCache;
+import org.apache.camel.impl.EmptyConsumerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.ServiceSupport;
@@ -59,6 +60,7 @@ public class PollEnricher extends ServiceSupport implements 
AsyncProcessor, IdAw
     private final Expression expression;
     private long timeout;
     private boolean aggregateOnException;
+    private int cacheSize;
 
     /**
      * Creates a new {@link PollEnricher}.
@@ -131,6 +133,14 @@ public class PollEnricher extends ServiceSupport 
implements AsyncProcessor, IdAw
         this.aggregationStrategy = defaultAggregationStrategy();
     }
 
+    public int getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(int cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+
     public void process(Exchange exchange) throws Exception {
         AsyncProcessorHelper.process(this, exchange);
     }
@@ -275,7 +285,16 @@ public class PollEnricher extends ServiceSupport 
implements AsyncProcessor, IdAw
     protected void doStart() throws Exception {
         if (consumerCache == null) {
             // create consumer cache if we use dynamic expressions for 
computing the endpoints to poll
-            consumerCache = new ConsumerCache(this, getCamelContext());
+            if (cacheSize < 0) {
+                consumerCache = new EmptyConsumerCache(this, camelContext);
+                LOG.debug("PollEnrich {} is not using ConsumerCache", this);
+            } else if (cacheSize == 0) {
+                consumerCache = new ConsumerCache(this, camelContext);
+                LOG.debug("PollEnrich {} using ConsumerCache with default 
cache size", this);
+            } else {
+                consumerCache = new ConsumerCache(this, camelContext, 
cacheSize);
+                LOG.debug("PollEnrich {} using ConsumerCache with 
cacheSize={}", this, cacheSize);
+            }
         }
         ServiceHelper.startServices(consumerCache, aggregationStrategy);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
index 122e72b..dd4b699 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -49,7 +49,7 @@ public class RecipientListNoCacheTest extends 
ContextTestSupport {
         return new RouteBuilder() {
             public void configure() {
                 from("direct:a").recipientList(
-                        
header("recipientListHeader").tokenize(",")).cacheSize(0);
+                        
header("recipientListHeader").tokenize(",")).cacheSize(-1);
             }
         };
 

http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionNoCacheTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionNoCacheTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionNoCacheTest.java
new file mode 100644
index 0000000..5c125d6
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionNoCacheTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.enricher;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class PollEnrichExpressionNoCacheTest extends ContextTestSupport {
+
+    public void testPollEnricExpression() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", 
"Bye World");
+
+        template.sendBody("seda:foo", "Hello World");
+        template.sendBody("seda:bar", "Bye World");
+
+        template.sendBodyAndHeader("direct:start", null, "source", "seda:foo");
+        template.sendBodyAndHeader("direct:start", null, "source", "seda:bar");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .pollEnrich().header("source").cacheSize(-1)
+                    .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
index 38a42ab..c14de38 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java
@@ -21,14 +21,16 @@ import org.apache.camel.builder.RouteBuilder;
 
 public class PollEnrichExpressionTest extends ContextTestSupport {
 
-    public void testPollEnricExpression() throws Exception {
-        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", 
"Bye World");
+    public void testPollEnrichExpression() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", 
"Bye World", "Hi World");
 
         template.sendBody("seda:foo", "Hello World");
         template.sendBody("seda:bar", "Bye World");
+        template.sendBody("seda:foo", "Hi World");
 
         template.sendBodyAndHeader("direct:start", null, "source", "seda:foo");
         template.sendBodyAndHeader("direct:start", null, "source", "seda:bar");
+        template.sendBodyAndHeader("direct:start", null, "source", "seda:foo");
 
         assertMockEndpointsSatisfied();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
----------------------------------------------------------------------
diff --git 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
index c0e0c0b..3703be4 100644
--- 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
+++ 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
@@ -33,7 +33,7 @@
     <route>
       <from uri="direct:enricher-test-1"/>
       <pollEnrich strategyRef="sampleAggregator">
-        <ref>foo1</ref>
+        <simple>ref:foo1</simple>
       </pollEnrich>
       <to uri="mock:mock"/>
     </route>
@@ -42,7 +42,7 @@
     <route>
       <from uri="direct:enricher-test-2"/>
       <pollEnrich timeout="1000" strategyRef="sampleAggregator">
-        <ref>foo2</ref>
+        <simple>ref:foo2</simple>
       </pollEnrich>
       <to uri="mock:mock"/>
     </route>
@@ -50,7 +50,7 @@
     <route>
       <from uri="direct:enricher-test-3"/>
       <pollEnrich timeout="-1" strategyRef="sampleAggregator">
-        <ref>foo3</ref>
+        <simple>ref:foo3</simple>
       </pollEnrich>
       <to uri="mock:mock"/>
     </route>
@@ -58,7 +58,7 @@
     <route>
       <from uri="direct:enricher-test-4"/>
       <pollEnrich strategyRef="sampleAggregator">
-        <ref>foo4</ref>
+        <simple>ref:foo4</simple>
       </pollEnrich>
       <to uri="mock:mock"/>
     </route>

Reply via email to