Author: davsclaus
Date: Fri Apr 29 18:43:51 2011
New Revision: 1097909
URL: http://svn.apache.org/viewvc?rev=1097909&view=rev
Log:
CAMEL-3912: Fixed issue when restaring routes using scheduled poll consumers,
and the consumer had consumer.xxx options in the endpoint. This would cause the
consumer.xxx options to be lost on the restart. Thanks to Roberto Rojas for
patch.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollEndpointConfigureConsumerRestartTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1097909&r1=1097908&r2=1097909&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
Fri Apr 29 18:43:51 2011
@@ -46,7 +46,7 @@ public abstract class ScheduledPollConsu
private boolean useFixedDelay;
private PollingConsumerPollStrategy pollStrategy = new
DefaultPollingConsumerPollStrategy();
- public ScheduledPollConsumer(DefaultEndpoint endpoint, Processor
processor) {
+ public ScheduledPollConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
// we only need one thread in the pool to schedule this task
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java?rev=1097909&r1=1097908&r2=1097909&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
Fri Apr 29 18:43:51 2011
@@ -59,14 +59,18 @@ public abstract class ScheduledPollEndpo
protected void configureConsumer(Consumer consumer) throws Exception {
if (consumerProperties != null) {
+ // use a defensive copy of the consumer properties as the methods
below will remove the used properties
+ // and in case we restart routes, we need access to the original
consumer properties again
+ Map<String, Object> copy = new HashMap<String,
Object>(consumerProperties);
+
// set reference properties first as they use # syntax that fools
the regular properties setter
- EndpointHelper.setReferenceProperties(getCamelContext(), consumer,
consumerProperties);
- EndpointHelper.setProperties(getCamelContext(), consumer,
consumerProperties);
- if (!this.isLenientProperties() && consumerProperties.size() > 0) {
- throw new
ResolveEndpointFailedException(this.getEndpointUri(), "There are " +
consumerProperties.size()
+ EndpointHelper.setReferenceProperties(getCamelContext(), consumer,
copy);
+ EndpointHelper.setProperties(getCamelContext(), consumer, copy);
+ if (!this.isLenientProperties() && copy.size() > 0) {
+ throw new
ResolveEndpointFailedException(this.getEndpointUri(), "There are " + copy.size()
+ " parameters that couldn't be set on the endpoint
consumer."
+ " Check the uri if the parameters are spelt correctly
and that they are properties of the endpoint."
- + " Unknown consumer parameters=[" + consumerProperties +
"]");
+ + " Unknown consumer parameters=[" + copy + "]");
}
}
}
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollEndpointConfigureConsumerRestartTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollEndpointConfigureConsumerRestartTest.java?rev=1097909&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollEndpointConfigureConsumerRestartTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollEndpointConfigureConsumerRestartTest.java
Fri Apr 29 18:43:51 2011
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class ScheduledPollEndpointConfigureConsumerRestartTest extends
ContextTestSupport {
+
+ private MyEndpoint my;
+ private Map<String, Object> props = new HashMap<String, Object>();
+
+ public void testRestart() throws Exception {
+ getMockEndpoint("mock:result").expectedMinimumMessageCount(1);
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals("Hello",
getMockEndpoint("mock:result").getExchanges().get(0).getIn().getBody());
+ assertEquals(123,
getMockEndpoint("mock:result").getExchanges().get(0).getIn().getHeader("foo"));
+
+ // restart route
+ resetMocks();
+ context.stopRoute("foo");
+
+ getMockEndpoint("mock:result").expectedMinimumMessageCount(1);
+
+ // start route
+ context.startRoute("foo");
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals("Hello",
getMockEndpoint("mock:result").getExchanges().get(0).getIn().getBody());
+ assertEquals(123,
getMockEndpoint("mock:result").getExchanges().get(0).getIn().getHeader("foo"));
+
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ props.put("foo", 123);
+ props.put("bar", "Hello");
+ props.put("delay", 1000);
+
+ my = new MyEndpoint();
+ my.setCamelContext(context);
+ my.setConsumerProperties(props);
+
+ from(my).routeId("foo").to("mock:result");
+ }
+ };
+ }
+
+ private class MyEndpoint extends ScheduledPollEndpoint {
+
+ @Override
+ public Producer createProducer() throws Exception {
+ return null;
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ MyConsumer answer = new MyConsumer(this, processor);
+ configureConsumer(answer);
+ return answer;
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
+ @Override
+ protected String createEndpointUri() {
+ return "myendpoint:foo";
+ }
+ }
+
+ public static final class MyConsumer extends ScheduledPollConsumer {
+
+ private int foo;
+ private String bar;
+
+ public MyConsumer(Endpoint endpoint, Processor processor) {
+ super(endpoint, processor);
+ }
+
+ public int getFoo() {
+ return foo;
+ }
+
+ public void setFoo(int foo) {
+ this.foo = foo;
+ }
+
+ public String getBar() {
+ return bar;
+ }
+
+ public void setBar(String bar) {
+ this.bar = bar;
+ }
+
+ @Override
+ protected int poll() throws Exception {
+ Exchange exchange = new DefaultExchange(getEndpoint());
+ exchange.getIn().setBody(bar);
+ exchange.getIn().setHeader("foo", foo);
+
+ getProcessor().process(exchange);
+
+ return 1;
+ }
+ }
+}