Updated Branches: refs/heads/master 55c66af89 -> c90f92441
CAMEL-6488: camel-netty-http allow to share port in OSGi environment. Work in progress. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7c947b46 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7c947b46 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7c947b46 Branch: refs/heads/master Commit: 7c947b46ca516a441d13a82813810feebbde0fd3 Parents: 55c66af Author: Claus Ibsen <[email protected]> Authored: Tue Jun 25 16:02:15 2013 +0200 Committer: Claus Ibsen <[email protected]> Committed: Wed Jun 26 08:32:24 2013 +0200 ---------------------------------------------------------------------- .../http/DefaultSharedNettyHttpServer.java | 78 +++++++++++++++++++ .../netty/http/HttpServerBootstrapFactory.java | 37 +++++---- .../http/HttpServerConsumerChannelFactory.java | 63 +++++++++++++++ .../netty/http/HttpServerPipelineFactory.java | 35 +++++++-- .../netty/http/NettyHttpComponent.java | 16 ++-- .../component/netty/http/NettyHttpConsumer.java | 10 +++ .../component/netty/http/NettyHttpEndpoint.java | 28 ++++++- .../netty/http/SharedNettyHttpServer.java | 53 +++++++++++++ .../HttpServerMultiplexChannelHandler.java | 27 +++++-- .../netty/http/NettySharedHttpServerTest.java | 82 ++++++++++++++++++++ .../netty/DefaultServerPipelineFactory.java | 29 +++++-- .../component/netty/NettyConfiguration.java | 9 --- .../NettyServerBootstrapConfiguration.java | 14 ++++ .../netty/NettyServerBootstrapFactory.java | 6 +- .../SingleTCPNettyServerBootstrapFactory.java | 44 +++++++---- .../SingleUDPNettyServerBootstrapFactory.java | 27 ++++--- 16 files changed, 478 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultSharedNettyHttpServer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultSharedNettyHttpServer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultSharedNettyHttpServer.java new file mode 100644 index 0000000..ff24819 --- /dev/null +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultSharedNettyHttpServer.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty.http; + +import org.apache.camel.component.netty.DefaultServerPipelineFactory; +import org.apache.camel.component.netty.NettyServerBootstrapConfiguration; +import org.apache.camel.component.netty.NettyServerBootstrapFactory; +import org.apache.camel.component.netty.http.handlers.HttpServerMultiplexChannelHandler; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; +import org.jboss.netty.channel.ChannelPipelineFactory; + +/** + * A default {@link SharedNettyHttpServer} to make sharing Netty server in Camel applications easier. + */ +public class DefaultSharedNettyHttpServer extends ServiceSupport implements SharedNettyHttpServer { + + private NettyServerBootstrapConfiguration configuration; + private HttpServerConsumerChannelFactory channelFactory; + private HttpServerBootstrapFactory bootstrapFactory; + + public void setNettyServerBootstrapConfiguration(NettyServerBootstrapConfiguration configuration) { + this.configuration = configuration; + } + + public int getPort() { + return configuration != null ? configuration.getPort() : -1; + } + + public HttpServerConsumerChannelFactory getConsumerChannelFactory() { + return channelFactory; + } + + public NettyServerBootstrapFactory getServerBootstrapFactory() { + return bootstrapFactory; + } + + protected void doStart() throws Exception { + ObjectHelper.notNull(configuration, "setNettyServerBootstrapConfiguration() must be called with a NettyServerBootstrapConfiguration instance", this); + + // port must be set + if (configuration.getPort() <= 0) { + throw new IllegalArgumentException("Port must be configured on NettyServerBootstrapConfiguration " + configuration); + } + + // force using tcp as the underlying transport + configuration.setProtocol("tcp"); + // TODO: ChannelPipelineFactory should be a shared to handle adding consumers + ChannelPipelineFactory pipelineFactory = new HttpServerPipelineFactory(configuration); + + channelFactory = new HttpServerMultiplexChannelHandler(); + channelFactory.init(configuration.getPort()); + + // create bootstrap factory and disable compatible check as its shared among the consumers + bootstrapFactory = new HttpServerBootstrapFactory(channelFactory, false); + bootstrapFactory.init(null, configuration, pipelineFactory); + } + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopServices(bootstrapFactory, channelFactory); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerBootstrapFactory.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerBootstrapFactory.java index e1168d7..cf8ae4e 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerBootstrapFactory.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerBootstrapFactory.java @@ -28,16 +28,22 @@ import org.slf4j.LoggerFactory; public class HttpServerBootstrapFactory extends SingleTCPNettyServerBootstrapFactory { private static final Logger LOG = LoggerFactory.getLogger(HttpServerBootstrapFactory.class); - private final NettyHttpComponent component; + private final HttpServerConsumerChannelFactory channelFactory; private int port; private NettyServerBootstrapConfiguration bootstrapConfiguration; + private boolean compatibleCheck; - public HttpServerBootstrapFactory(NettyHttpComponent component) { - this.component = component; + public HttpServerBootstrapFactory(HttpServerConsumerChannelFactory channelFactory) { + this(channelFactory, true); + } + + public HttpServerBootstrapFactory(HttpServerConsumerChannelFactory channelFactory, boolean compatibleCheck) { + this.channelFactory = channelFactory; + this.compatibleCheck = compatibleCheck; } @Override - public void init(CamelContext camelContext, NettyConfiguration configuration, ChannelPipelineFactory pipelineFactory) { + public void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) { super.init(camelContext, configuration, pipelineFactory); this.port = configuration.getPort(); this.bootstrapConfiguration = configuration; @@ -46,21 +52,24 @@ public class HttpServerBootstrapFactory extends SingleTCPNettyServerBootstrapFac } public void addConsumer(NettyConsumer consumer) { - // when adding additional consumers on the same port (eg to reuse port for multiple routes etc) then the Netty server bootstrap - // configuration must match, as its the 1st consumer that calls the init method, which configuration is used for the Netty server bootstrap - // we do this to avoid mis configuration, so people configure SSL and plain configuration on the same port etc. + if (compatibleCheck) { + // when adding additional consumers on the same port (eg to reuse port for multiple routes etc) then the Netty server bootstrap + // configuration must match, as its the 1st consumer that calls the init method, which configuration is used for the Netty server bootstrap + // we do this to avoid mis configuration, so people configure SSL and plain configuration on the same port etc. - // first it may be the same instance, so only check for compatibility of different instance - if (bootstrapConfiguration != consumer.getConfiguration() && !bootstrapConfiguration.compatible(consumer.getConfiguration())) { - throw new IllegalArgumentException("Bootstrap configuration must be identical when adding additional consumer: " + consumer.getEndpoint() + " on same port: " + port - + ".\n Existing " + bootstrapConfiguration.toStringBootstrapConfiguration() + "\n New " + consumer.getConfiguration().toStringBootstrapConfiguration()); + // first it may be the same instance, so only check for compatibility of different instance + if (bootstrapConfiguration != consumer.getConfiguration() && !bootstrapConfiguration.compatible(consumer.getConfiguration())) { + throw new IllegalArgumentException("Bootstrap configuration must be identical when adding additional consumer: " + consumer.getEndpoint() + " on same port: " + port + + ".\n Existing " + bootstrapConfiguration.toStringBootstrapConfiguration() + "\n New " + consumer.getConfiguration().toStringBootstrapConfiguration()); + } } if (LOG.isDebugEnabled()) { NettyHttpConsumer httpConsumer = (NettyHttpConsumer) consumer; LOG.debug("BootstrapFactory on port {} is adding consumer with context-path {}", port, httpConsumer.getConfiguration().getPath()); } - component.getMultiplexChannelHandler(port).addConsumer((NettyHttpConsumer) consumer); + + channelFactory.addConsumer((NettyHttpConsumer) consumer); } @Override @@ -69,7 +78,7 @@ public class HttpServerBootstrapFactory extends SingleTCPNettyServerBootstrapFac NettyHttpConsumer httpConsumer = (NettyHttpConsumer) consumer; LOG.debug("BootstrapFactory on port {} is removing consumer with context-path {}", port, httpConsumer.getConfiguration().getPath()); } - component.getMultiplexChannelHandler(port).removeConsumer((NettyHttpConsumer) consumer); + channelFactory.removeConsumer((NettyHttpConsumer) consumer); } @Override @@ -81,7 +90,7 @@ public class HttpServerBootstrapFactory extends SingleTCPNettyServerBootstrapFac @Override public void stop() throws Exception { // only stop if no more active consumers - int consumers = component.getMultiplexChannelHandler(port).consumers(); + int consumers = channelFactory.consumers(); if (consumers == 0) { LOG.debug("BootstrapFactory on port {} is stopping", port); super.stop(); http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerConsumerChannelFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerConsumerChannelFactory.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerConsumerChannelFactory.java new file mode 100644 index 0000000..386cf09 --- /dev/null +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerConsumerChannelFactory.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty.http; + +import org.jboss.netty.channel.ChannelHandler; + +/** + * Factory for setting up Netty {@link ChannelHandler} bound to a given Netty port. + * <p/> + * This factory allows for consumers to reuse existing {@link org.jboss.netty.bootstrap.ServerBootstrap} which + * allows to share the same port for multiple consumers. + * + * This factory is needed to ensure we can handle the situations when consumers is added and removing in + * a dynamic environment such as OSGi, where Camel applications can be hot-deployed. And we want these + * Camel applications to be able to share the same Netty port in a easy way. + */ +public interface HttpServerConsumerChannelFactory { + + /** + * Initializes this consumer channel factory with the given port. + */ + void init(int port); + + /** + * The port number this consumer channel factory is using. + */ + int getPort(); + + /** + * Adds the given consumer. + */ + void addConsumer(NettyHttpConsumer consumer); + + /** + * Removes the given consumer + */ + void removeConsumer(NettyHttpConsumer consumer); + + /** + * Number of active consumers + */ + int consumers(); + + /** + * Gets the {@link ChannelHandler} + */ + ChannelHandler getChannelHandler(); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerPipelineFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerPipelineFactory.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerPipelineFactory.java index 434fa36..9772460 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerPipelineFactory.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerPipelineFactory.java @@ -20,6 +20,7 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import org.apache.camel.component.netty.NettyConsumer; +import org.apache.camel.component.netty.NettyServerBootstrapConfiguration; import org.apache.camel.component.netty.ServerPipelineFactory; import org.apache.camel.component.netty.ssl.SSLEngineFactory; import org.apache.camel.util.ObjectHelper; @@ -47,10 +48,23 @@ public class HttpServerPipelineFactory extends ServerPipelineFactory { // default constructor needed } + public HttpServerPipelineFactory(NettyServerBootstrapConfiguration configuration) { + this.consumer = null; + try { + this.sslContext = createSSLContext(configuration); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + + if (sslContext != null) { + LOG.info("Created SslContext {}", sslContext); + } + } + public HttpServerPipelineFactory(NettyHttpConsumer nettyConsumer) { this.consumer = nettyConsumer; try { - this.sslContext = createSSLContext(consumer); + this.sslContext = createSSLContext(consumer.getConfiguration()); } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } @@ -70,6 +84,7 @@ public class HttpServerPipelineFactory extends ServerPipelineFactory { // Create a default pipeline implementation. ChannelPipeline pipeline = Channels.pipeline(); + // TODO: on demand use configuration SslHandler sslHandler = configureServerSSLOnDemand(); if (sslHandler != null) { LOG.debug("Server SSL handler configured and added as an interceptor against the ChannelPipeline: {}", sslHandler); @@ -86,22 +101,28 @@ public class HttpServerPipelineFactory extends ServerPipelineFactory { pipeline.addLast("deflater", new HttpContentCompressor()); } + // TODO: shared netty http server in ctr // handler to route Camel messages - int port = consumer.getConfiguration().getPort(); - ChannelHandler handler = consumer.getEndpoint().getComponent().getMultiplexChannelHandler(port); + ChannelHandler handler; + if (consumer.getSharedNettyHttpServer() != null) { + handler = consumer.getSharedNettyHttpServer().getConsumerChannelFactory().getChannelHandler(); + } else { + int port = consumer.getConfiguration().getPort(); + handler = consumer.getEndpoint().getComponent().getMultiplexChannelHandler(port).getChannelHandler(); + } pipeline.addLast("handler", handler); return pipeline; } - private SSLContext createSSLContext(NettyConsumer consumer) throws Exception { - if (!consumer.getConfiguration().isSsl()) { + private SSLContext createSSLContext(NettyServerBootstrapConfiguration configuration) throws Exception { + if (!configuration.isSsl()) { return null; } // create ssl context once - if (consumer.getConfiguration().getSslContextParameters() != null) { - SSLContext context = consumer.getConfiguration().getSslContextParameters().createSSLContext(); + if (configuration.getSslContextParameters() != null) { + SSLContext context = configuration.getSslContextParameters().createSSLContext(); return context; } http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java index 11a728d..1542c9e 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java @@ -37,10 +37,8 @@ import org.apache.camel.util.UnsafeUriCharactersEncoder; */ public class NettyHttpComponent extends NettyComponent implements HeaderFilterStrategyAware { - // TODO: support on consumer - // - urlrewrite - - private final Map<Integer, HttpServerMultiplexChannelHandler> multiplexChannelHandlers = new HashMap<Integer, HttpServerMultiplexChannelHandler>(); + // factories which is created by this component and therefore manage their lifecycles + private final Map<Integer, HttpServerConsumerChannelFactory> multiplexChannelHandlers = new HashMap<Integer, HttpServerConsumerChannelFactory>(); private final Map<String, HttpServerBootstrapFactory> bootstrapFactories = new HashMap<String, HttpServerBootstrapFactory>(); private NettyHttpBinding nettyHttpBinding; private HeaderFilterStrategy headerFilterStrategy; @@ -129,10 +127,11 @@ public class NettyHttpComponent extends NettyComponent implements HeaderFilterSt this.headerFilterStrategy = headerFilterStrategy; } - public synchronized HttpServerMultiplexChannelHandler getMultiplexChannelHandler(int port) { - HttpServerMultiplexChannelHandler answer = multiplexChannelHandlers.get(port); + public synchronized HttpServerConsumerChannelFactory getMultiplexChannelHandler(int port) { + HttpServerConsumerChannelFactory answer = multiplexChannelHandlers.get(port); if (answer == null) { - answer = new HttpServerMultiplexChannelHandler(port); + answer = new HttpServerMultiplexChannelHandler(); + answer.init(port); multiplexChannelHandlers.put(port, answer); } return answer; @@ -142,7 +141,8 @@ public class NettyHttpComponent extends NettyComponent implements HeaderFilterSt String key = consumer.getConfiguration().getAddress(); HttpServerBootstrapFactory answer = bootstrapFactories.get(key); if (answer == null) { - answer = new HttpServerBootstrapFactory(this); + HttpServerConsumerChannelFactory channelFactory = getMultiplexChannelHandler(consumer.getConfiguration().getPort()); + answer = new HttpServerBootstrapFactory(channelFactory); answer.init(getCamelContext(), consumer.getConfiguration(), new HttpServerPipelineFactory(consumer)); bootstrapFactories.put(key, answer); } http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java index 2c27204..e4f9b40 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java @@ -26,6 +26,8 @@ import org.apache.camel.util.ObjectHelper; */ public class NettyHttpConsumer extends NettyConsumer { + private SharedNettyHttpServer sharedNettyHttpServer; + public NettyHttpConsumer(NettyHttpEndpoint nettyEndpoint, Processor processor, NettyConfiguration configuration) { super(nettyEndpoint, processor, configuration); } @@ -40,6 +42,14 @@ public class NettyHttpConsumer extends NettyConsumer { return (NettyHttpConfiguration) super.getConfiguration(); } + public SharedNettyHttpServer getSharedNettyHttpServer() { + return sharedNettyHttpServer; + } + + public void setSharedNettyHttpServer(SharedNettyHttpServer sharedNettyHttpServer) { + this.sharedNettyHttpServer = sharedNettyHttpServer; + } + @Override protected void doStart() throws Exception { super.doStart(); http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java index 011ce65..aac08bd 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java @@ -31,17 +31,21 @@ import org.apache.camel.util.ObjectHelper; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.handler.codec.http.HttpRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * HTTP based {@link NettyEndpoint} */ public class NettyHttpEndpoint extends NettyEndpoint implements HeaderFilterStrategyAware { + private static final Logger LOG = LoggerFactory.getLogger(NettyHttpEndpoint.class); private String uriParameters; private NettyHttpBinding nettyHttpBinding; private HeaderFilterStrategy headerFilterStrategy; private boolean traceEnabled; private String httpMethodRestrict; + private SharedNettyHttpServer sharedNettyHttpServer; public NettyHttpEndpoint(String endpointUri, NettyHttpComponent component, NettyConfiguration configuration) { super(endpointUri, component, configuration); @@ -56,10 +60,18 @@ public class NettyHttpEndpoint extends NettyEndpoint implements HeaderFilterStra public Consumer createConsumer(Processor processor) throws Exception { NettyHttpConsumer answer = new NettyHttpConsumer(this, processor, getConfiguration()); configureConsumer(answer); - // reuse pipeline factory for the same address - HttpServerBootstrapFactory factory = getComponent().getOrCreateHttpNettyServerBootstrapFactory(answer); - // force using our server bootstrap factory - answer.setNettyServerBootstrapFactory(factory); + + if (sharedNettyHttpServer != null) { + answer.setSharedNettyHttpServer(sharedNettyHttpServer); + answer.setNettyServerBootstrapFactory(sharedNettyHttpServer.getServerBootstrapFactory()); + LOG.debug("Created NettyHttpConsumer: {} using SharedNettyHttpServer: {}", answer, sharedNettyHttpServer); + } else { + // reuse pipeline factory for the same address + HttpServerBootstrapFactory factory = getComponent().getOrCreateHttpNettyServerBootstrapFactory(answer); + // force using our server bootstrap factory + answer.setNettyServerBootstrapFactory(factory); + LOG.debug("Created NettyHttpConsumer: {} using HttpServerBootstrapFactory: {}", answer, factory); + } return answer; } @@ -148,6 +160,14 @@ public class NettyHttpEndpoint extends NettyEndpoint implements HeaderFilterStra this.uriParameters = uriParameters; } + public SharedNettyHttpServer getSharedNettyHttpServer() { + return sharedNettyHttpServer; + } + + public void setSharedNettyHttpServer(SharedNettyHttpServer sharedNettyHttpServer) { + this.sharedNettyHttpServer = sharedNettyHttpServer; + } + @Override protected void doStart() throws Exception { super.doStart(); http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/SharedNettyHttpServer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/SharedNettyHttpServer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/SharedNettyHttpServer.java new file mode 100644 index 0000000..66244bb --- /dev/null +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/SharedNettyHttpServer.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty.http; + +import org.apache.camel.Service; +import org.apache.camel.component.netty.NettyServerBootstrapConfiguration; +import org.apache.camel.component.netty.NettyServerBootstrapFactory; + +/** + * A single interface to easily configure and setup a shared Netty HTTP server + * to be re-used among other Camel applications. + * <p/> + * To use this, just define a {@link NettyServerBootstrapConfiguration} configuration, and + * set this using {@link #setNettyServerBootstrapConfiguration(org.apache.camel.component.netty.NettyServerBootstrapConfiguration)}. + * Then call the {@link #start()} to initialize this shared server. + */ +public interface SharedNettyHttpServer extends Service { + + /** + * Sets the bootstrap configuration to use by this shared Netty HTTP server. + */ + void setNettyServerBootstrapConfiguration(NettyServerBootstrapConfiguration configuration); + + /** + * Gets the port number this Netty HTTP server uses. + */ + int getPort(); + + /** + * Gets the {@link HttpServerConsumerChannelFactory} to use. + */ + HttpServerConsumerChannelFactory getConsumerChannelFactory(); + + /** + * Gets the {@link NettyServerBootstrapFactory} to use. + */ + NettyServerBootstrapFactory getServerBootstrapFactory(); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java index 71d9218..1674235 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java @@ -23,9 +23,11 @@ import java.util.concurrent.ConcurrentMap; import org.apache.camel.Exchange; import org.apache.camel.component.netty.http.ContextPathMatcher; import org.apache.camel.component.netty.http.DefaultContextPathMatcher; +import org.apache.camel.component.netty.http.HttpServerConsumerChannelFactory; import org.apache.camel.component.netty.http.NettyHttpConsumer; import org.apache.camel.util.UnsafeUriCharactersEncoder; import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.ChannelHandler; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; @@ -44,15 +46,21 @@ import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; * target handler based on the http context path in the incoming request. This is used to allow to reuse * the same Netty consumer, allowing to have multiple routes on the same netty {@link org.jboss.netty.bootstrap.ServerBootstrap} */ -public class HttpServerMultiplexChannelHandler extends SimpleChannelUpstreamHandler { +public class HttpServerMultiplexChannelHandler extends SimpleChannelUpstreamHandler implements HttpServerConsumerChannelFactory { // use NettyHttpConsumer as logger to make it easier to read the logs as this is part of the consumer private static final transient Logger LOG = LoggerFactory.getLogger(NettyHttpConsumer.class); private final ConcurrentMap<ContextPathMatcher, HttpServerChannelHandler> consumers = new ConcurrentHashMap<ContextPathMatcher, HttpServerChannelHandler>(); - private final String token; - private final int len; + private int port; + private String token; + private int len; - public HttpServerMultiplexChannelHandler(int port) { + public HttpServerMultiplexChannelHandler() { + // must have default no-arg constructor to allow IoC containers to manage it + } + + public void init(int port) { + this.port = port; this.token = ":" + port; this.len = token.length(); } @@ -69,13 +77,18 @@ public class HttpServerMultiplexChannelHandler extends SimpleChannelUpstreamHand consumers.remove(matcher); } - /** - * Number of active consumers. - */ public int consumers() { return consumers.size(); } + public int getPort() { + return port; + } + + public ChannelHandler getChannelHandler() { + return this; + } + @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception { // store request, as this channel handler is created per pipeline http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java new file mode 100644 index 0000000..67bbff4 --- /dev/null +++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty.http; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.netty.NettyServerBootstrapConfiguration; +import org.apache.camel.impl.JndiRegistry; +import org.junit.Test; + +public class NettySharedHttpServerTest extends BaseNettyTest { + + private SharedNettyHttpServer sharedNettyHttpServer; + + @Override + protected JndiRegistry createRegistry() throws Exception { + sharedNettyHttpServer = new DefaultSharedNettyHttpServer(); + + NettyServerBootstrapConfiguration configuration = new NettyServerBootstrapConfiguration(); + configuration.setPort(getPort()); + configuration.setHost("localhost"); + configuration.setBacklog(20); + configuration.setKeepAlive(true); + sharedNettyHttpServer.setNettyServerBootstrapConfiguration(configuration); + + sharedNettyHttpServer.start(); + + JndiRegistry jndi = super.createRegistry(); + jndi.bind("myNettyServer", sharedNettyHttpServer); + return jndi; + } + + @Override + public void tearDown() throws Exception { + sharedNettyHttpServer.stop(); + super.tearDown(); + } + + @Test + public void testTwoRoutes() throws Exception { + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:bar").expectedBodiesReceived("Hello Camel"); + + String out = template.requestBody("netty-http:http://localhost:{{port}}/foo", "Hello World", String.class); + assertEquals("Bye World", out); + + out = template.requestBody("netty-http:http://localhost:{{port}}/bar", "Hello Camel", String.class); + assertEquals("Bye Camel", out); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("netty-http:http://0.0.0.0:{{port}}/foo?sharedNettyHttpServer=#myNettyServer") + .to("mock:foo") + .transform().constant("Bye World"); + + from("netty-http:http://0.0.0.0:{{port}}/bar?sharedNettyHttpServer=#myNettyServer") + .to("mock:bar") + .transform().constant("Bye Camel"); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java index 247067d..6ae04d0 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java @@ -34,16 +34,33 @@ import org.slf4j.LoggerFactory; public class DefaultServerPipelineFactory extends ServerPipelineFactory { private static final Logger LOG = LoggerFactory.getLogger(DefaultServerPipelineFactory.class); - private final NettyConsumer consumer; + private NettyConsumer consumer; private SSLContext sslContext; + public DefaultServerPipelineFactory(NettyServerBootstrapConfiguration configuration) { + this.consumer = null; + try { + this.sslContext = createSSLContext(configuration); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + + if (sslContext != null) { + LOG.info("Created SslContext {}", sslContext); + } + } + public DefaultServerPipelineFactory(NettyConsumer consumer) { this.consumer = consumer; try { - this.sslContext = createSSLContext(consumer); + this.sslContext = createSSLContext(consumer.getConfiguration()); } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } + + if (sslContext != null) { + LOG.info("Created SslContext {}", sslContext); + } } @Override @@ -97,14 +114,14 @@ public class DefaultServerPipelineFactory extends ServerPipelineFactory { pipeline.addLast(name, handler); } - private SSLContext createSSLContext(NettyConsumer consumer) throws Exception { - if (!consumer.getConfiguration().isSsl()) { + private SSLContext createSSLContext(NettyServerBootstrapConfiguration configuration) throws Exception { + if (!configuration.isSsl()) { return null; } // create ssl context once - if (consumer.getConfiguration().getSslContextParameters() != null) { - SSLContext context = consumer.getConfiguration().getSslContextParameters().createSSLContext(); + if (configuration.getSslContextParameters() != null) { + SSLContext context = configuration.getSslContextParameters().createSSLContext(); return context; } http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java index 0c1fa0e..53bd02f 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java @@ -58,7 +58,6 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem private LoggingLevel serverClosedChannelExceptionCaughtLogLevel = LoggingLevel.DEBUG; private boolean allowDefaultCodec = true; private ClientPipelineFactory clientPipelineFactory; - private SSLContextParameters sslContextParameters; private int maximumPoolSize = 16; private boolean orderedThreadPoolExecutor = true; private int producerPoolMaxActive = -1; @@ -366,14 +365,6 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem return clientPipelineFactory; } - public SSLContextParameters getSslContextParameters() { - return sslContextParameters; - } - - public void setSslContextParameters(SSLContextParameters sslContextParameters) { - this.sslContextParameters = sslContextParameters; - } - public int getMaximumPoolSize() { return maximumPoolSize; } http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java index 6992874..bc71182 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java @@ -19,6 +19,7 @@ package org.apache.camel.component.netty; import java.io.File; import java.util.Map; +import org.apache.camel.util.jsse.SSLContextParameters; import org.jboss.netty.handler.ssl.SslHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +46,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable { // SSL options is also part of the server bootstrap as the server listener on port X is either plain or SSL protected boolean ssl; protected SslHandler sslHandler; + protected SSLContextParameters sslContextParameters; protected boolean needClientAuth; protected File keyStoreFile; protected File trustStoreFile; @@ -182,6 +184,14 @@ public class NettyServerBootstrapConfiguration implements Cloneable { this.sslHandler = sslHandler; } + public SSLContextParameters getSslContextParameters() { + return sslContextParameters; + } + + public void setSslContextParameters(SSLContextParameters sslContextParameters) { + this.sslContextParameters = sslContextParameters; + } + public boolean isNeedClientAuth() { return needClientAuth; } @@ -349,6 +359,9 @@ public class NettyServerBootstrapConfiguration implements Cloneable { if (sslHandler != other.sslHandler) { return false; } + if (sslContextParameters != other.sslContextParameters) { + return false; + } if (needClientAuth != other.needClientAuth) { return false; } @@ -396,6 +409,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable { ", options=" + options + ", ssl=" + ssl + ", sslHandler=" + sslHandler + + ", sslContextParameters='" + sslContextParameters + '\'' + ", needClientAuth=" + needClientAuth + ", keyStoreFile=" + keyStoreFile + ", trustStoreFile=" + trustStoreFile + http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java index 6dbb817..a27ccfb 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java @@ -32,8 +32,12 @@ public interface NettyServerBootstrapFactory extends Service { /** * Initializes this {@link NettyServerBootstrapFactory}. + * + * @param camelContext Use <tt>null</tt> if this factory is to be shared among other Camel applications. + * @param configuration the bootstrap configuration + * @param pipelineFactory the pipeline factory */ - void init(CamelContext camelContext, NettyConfiguration configuration, ChannelPipelineFactory pipelineFactory); + void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory); /** * When a new {@link Channel} is opened. http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java index c6dba03..45442ce 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java @@ -19,10 +19,10 @@ package org.apache.camel.component.netty; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.camel.CamelContext; import org.apache.camel.support.ServiceSupport; -import org.apache.camel.util.ObjectHelper; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; @@ -42,7 +42,7 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme protected static final Logger LOG = LoggerFactory.getLogger(SingleTCPNettyServerBootstrapFactory.class); private final ChannelGroup allChannels; private CamelContext camelContext; - private NettyConfiguration configuration; + private NettyServerBootstrapConfiguration configuration; private ChannelPipelineFactory pipelineFactory; private ChannelFactory channelFactory; private ServerBootstrap serverBootstrap; @@ -54,7 +54,8 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme this.allChannels = new DefaultChannelGroup(SingleTCPNettyServerBootstrapFactory.class.getName()); } - public void init(CamelContext camelContext, NettyConfiguration configuration, ChannelPipelineFactory pipelineFactory) { + public void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) { + // notice CamelContext can be optional this.camelContext = camelContext; this.configuration = configuration; this.pipelineFactory = pipelineFactory; @@ -78,7 +79,6 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme @Override protected void doStart() throws Exception { - ObjectHelper.notNull(camelContext, "CamelContext"); startServerBootstrap(); } @@ -88,15 +88,25 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme } protected void startServerBootstrap() { - bossExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss"); - workerExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker"); - - if (configuration.getWorkerCount() <= 0) { - channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor); + if (camelContext != null) { + bossExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss"); + workerExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker"); + + if (configuration.getWorkerCount() <= 0) { + channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor); + } else { + channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor, + configuration.getWorkerCount()); + } } else { - channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor, - configuration.getWorkerCount()); + if (configuration.getWorkerCount() <= 0) { + channelFactory = new NioServerSocketChannelFactory(); + } else { + channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), + Executors.newCachedThreadPool(), configuration.getWorkerCount()); + } } + serverBootstrap = new ServerBootstrap(channelFactory); serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); @@ -138,11 +148,19 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme // and then shutdown the thread pools if (bossExecutor != null) { - camelContext.getExecutorServiceManager().shutdown(bossExecutor); + if (camelContext != null) { + camelContext.getExecutorServiceManager().shutdown(bossExecutor); + } else { + bossExecutor.shutdownNow(); + } bossExecutor = null; } if (workerExecutor != null) { - camelContext.getExecutorServiceManager().shutdown(workerExecutor); + if (camelContext != null) { + camelContext.getExecutorServiceManager().shutdown(workerExecutor); + } else { + workerExecutor.shutdownNow(); + } workerExecutor = null; } } http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java index ea6e5a0..138c98f 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java @@ -19,10 +19,10 @@ package org.apache.camel.component.netty; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.camel.CamelContext; import org.apache.camel.support.ServiceSupport; -import org.apache.camel.util.ObjectHelper; import org.jboss.netty.bootstrap.ConnectionlessBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelPipelineFactory; @@ -43,19 +43,19 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme protected static final Logger LOG = LoggerFactory.getLogger(SingleUDPNettyServerBootstrapFactory.class); private final ChannelGroup allChannels; private CamelContext camelContext; - private NettyConfiguration configuration; + private NettyServerBootstrapConfiguration configuration; private ChannelPipelineFactory pipelineFactory; private DatagramChannelFactory datagramChannelFactory; private ConnectionlessBootstrap connectionlessServerBootstrap; private Channel channel; - private ExecutorService bossExecutor; private ExecutorService workerExecutor; public SingleUDPNettyServerBootstrapFactory() { this.allChannels = new DefaultChannelGroup(SingleUDPNettyServerBootstrapFactory.class.getName()); } - public void init(CamelContext camelContext, NettyConfiguration configuration, ChannelPipelineFactory pipelineFactory) { + public void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) { + // notice CamelContext can be optional this.camelContext = camelContext; this.configuration = configuration; this.pipelineFactory = pipelineFactory; @@ -79,7 +79,6 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme @Override protected void doStart() throws Exception { - ObjectHelper.notNull(camelContext, "CamelContext"); startServerBootstrap(); } @@ -89,12 +88,18 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme } protected void startServerBootstrap() { - workerExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker"); + if (camelContext != null) { + workerExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker"); + } else { + workerExecutor = Executors.newCachedThreadPool(); + } + if (configuration.getWorkerCount() <= 0) { datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor); } else { datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor, configuration.getWorkerCount()); } + connectionlessServerBootstrap = new ConnectionlessBootstrap(datagramChannelFactory); connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive()); connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay()); @@ -143,12 +148,12 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme } // and then shutdown the thread pools - if (bossExecutor != null) { - camelContext.getExecutorServiceManager().shutdown(bossExecutor); - bossExecutor = null; - } if (workerExecutor != null) { - camelContext.getExecutorServiceManager().shutdown(workerExecutor); + if (camelContext != null) { + camelContext.getExecutorServiceManager().shutdown(workerExecutor); + } else { + workerExecutor.shutdownNow(); + } workerExecutor = null; } }
