CAMEL-7116: jetty http producer should stop client thread pool when stopping.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/38119b9d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/38119b9d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/38119b9d Branch: refs/heads/camel-2.12.x Commit: 38119b9d63df163a6f3757662503e396634d29b8 Parents: 483b445 Author: Claus Ibsen <[email protected]> Authored: Fri Jan 10 10:08:17 2014 +0100 Committer: Claus Ibsen <[email protected]> Committed: Fri Jan 10 10:28:29 2014 +0100 ---------------------------------------------------------------------- .../component/jetty/JettyHttpProducer.java | 11 +++ .../HttpJettyProducerRecipientListTest.java | 70 ++++++++++++++++++++ 2 files changed, 81 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/38119b9d/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java b/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java index 3e28b75..fa4a69f 100644 --- a/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java +++ b/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java @@ -43,6 +43,7 @@ import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.io.ByteArrayBuffer; +import org.eclipse.jetty.util.component.LifeCycle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -266,6 +267,11 @@ public class JettyHttpProducer extends DefaultProducer implements AsyncProcessor // only start non-shared client if (!sharedClient && client != null) { client.start(); + // start the thread pool + LOG.debug("Starting client thread pool {}", client.getThreadPool()); + if (client.getThreadPool() instanceof LifeCycle) { + ((LifeCycle) client.getThreadPool()).start(); + } } super.doStart(); } @@ -276,6 +282,11 @@ public class JettyHttpProducer extends DefaultProducer implements AsyncProcessor // only stop non-shared client if (!sharedClient && client != null) { client.stop(); + // stop thread pool + LOG.debug("Stopping client thread pool {}", client.getThreadPool()); + if (client.getThreadPool() instanceof LifeCycle) { + ((LifeCycle) client.getThreadPool()).stop(); + } } } } http://git-wip-us.apache.org/repos/asf/camel/blob/38119b9d/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/HttpJettyProducerRecipientListTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/HttpJettyProducerRecipientListTest.java b/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/HttpJettyProducerRecipientListTest.java new file mode 100644 index 0000000..ab06e43 --- /dev/null +++ b/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/HttpJettyProducerRecipientListTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jetty.jettyproducer; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.jetty.BaseJettyTest; +import org.junit.Test; + +/** + * @version + */ +public class HttpJettyProducerRecipientListTest extends BaseJettyTest { + + @Test + public void testRecipientList() throws Exception { + // these tests does not run well on Windows + if (isPlatform("windows")) { + return; + } + + // give Jetty time to startup properly + Thread.sleep(1000); + + Exchange a = template.request("direct:a", new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader("slip", "jetty://http://localhost:" + getPort() + "/myapp?foo=123&bar=cheese&httpClientMinThreads=4&httpClientMaxThreads=8"); + } + }); + assertNotNull(a); + + assertEquals("Bye cheese", a.getOut().getBody(String.class)); + assertEquals(246, a.getOut().getHeader("foo", Integer.class).intValue()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:a").recipientList(header("slip")); + + from("jetty://http://localhost:{{port}}/myapp").process(new Processor() { + public void process(Exchange exchange) throws Exception { + int foo = exchange.getIn().getHeader("foo", Integer.class); + String bar = exchange.getIn().getHeader("bar", String.class); + + exchange.getOut().setHeader("foo", foo * 2); + exchange.getOut().setBody("Bye " + bar); + } + }); + } + }; + } +} \ No newline at end of file
