Repository: camel Updated Branches: refs/heads/master 109b8a436 -> e8e6cc4a6
CAMEL-10003: camel-netty4 - add support for using native transport. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e8e6cc4a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e8e6cc4a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e8e6cc4a Branch: refs/heads/master Commit: e8e6cc4a6da2c19050508b0c02d425519b4ea714 Parents: 109b8a4 Author: Claus Ibsen <[email protected]> Authored: Tue May 31 11:39:45 2016 +0200 Committer: Claus Ibsen <[email protected]> Committed: Tue May 31 11:39:45 2016 +0200 ---------------------------------------------------------------------- components/camel-netty4/src/main/docs/netty4.adoc | 7 ++++++- .../ClientModeTCPNettyServerBootstrapFactory.java | 8 +++++++- .../camel/component/netty4/NettyProducer.java | 18 +++++++++++++++--- .../netty4/NettyServerBootstrapConfiguration.java | 16 +++++++++++++++- .../netty4/NettyServerBossPoolBuilder.java | 17 ++++++++++++++++- .../component/netty4/NettyWorkerPoolBuilder.java | 17 ++++++++++++++++- .../SingleTCPNettyServerBootstrapFactory.java | 9 ++++++++- .../SingleUDPNettyServerBootstrapFactory.java | 8 +++++++- 8 files changed, 90 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e8e6cc4a/components/camel-netty4/src/main/docs/netty4.adoc ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/docs/netty4.adoc b/components/camel-netty4/src/main/docs/netty4.adoc index 2a01619..0c8d518 100644 --- a/components/camel-netty4/src/main/docs/netty4.adoc +++ b/components/camel-netty4/src/main/docs/netty4.adoc @@ -77,8 +77,10 @@ The Netty4 component supports 3 options which are listed below. + + // endpoint options: START -The Netty4 component supports 72 endpoint options which are listed below: +The Netty4 component supports 73 endpoint options which are listed below: {% raw %} [width="100%",cols="2s,1,1m,1m,5",options="header"] @@ -126,6 +128,7 @@ The Netty4 component supports 72 endpoint options which are listed below: | useByteBuf | producer (advanced) | false | boolean | If the useByteBuf is true netty producer will turn the message body into ByteBuf before sending it out. | bootstrapConfiguration | advanced | | NettyServerBootstrapConfiguration | To use a custom configured NettyServerBootstrapConfiguration for configuring this endpoint. | exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern when creating an exchange +| nativeTransport | advanced | false | boolean | Whether to use native transport instead of NIO. Native transport takes advantage of the host operating system and is only supported on some platforms. You need to add the netty JAR for the host operating system you are using. See more details at: http://netty.io/wiki/native-transports.html | options | advanced | | Map | Allows to configure additional netty options using option. as prefix. For example option.child.keepAlive=false to set the netty option child.keepAlive=false. See the Netty documentation for possible options that can be used. | receiveBufferSize | advanced | 65536 | int | The TCP/UDP buffer sizes to be used during inbound communication. Size is bytes. | receiveBufferSizePredictor | advanced | | int | Configures the buffer size predictor. See details at Jetty documentation and this mail thread. @@ -163,6 +166,8 @@ The Netty4 component supports 72 endpoint options which are listed below: + + [[Netty4-RegistrybasedOptions]] Registry based Options ^^^^^^^^^^^^^^^^^^^^^^ http://git-wip-us.apache.org/repos/asf/camel/blob/e8e6cc4a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientModeTCPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientModeTCPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientModeTCPNettyServerBootstrapFactory.java index fc9cb2c..653da19 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientModeTCPNettyServerBootstrapFactory.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientModeTCPNettyServerBootstrapFactory.java @@ -29,6 +29,7 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import org.apache.camel.CamelContext; @@ -120,6 +121,7 @@ public class ClientModeTCPNettyServerBootstrapFactory extends ServiceSupport imp if (wg == null) { // create new pool which we should shutdown when stopping as its not shared workerGroup = new NettyWorkerPoolBuilder() + .withNativeTransport(configuration.isNativeTransport()) .withWorkerCount(configuration.getWorkerCount()) .withName("NettyServerTCPWorker") .build(); @@ -127,7 +129,11 @@ public class ClientModeTCPNettyServerBootstrapFactory extends ServiceSupport imp } clientBootstrap = new Bootstrap(); - clientBootstrap.channel(NioSocketChannel.class); + if (configuration.isNativeTransport()) { + clientBootstrap.channel(EpollSocketChannel.class); + } else { + clientBootstrap.channel(NioSocketChannel.class); + } clientBootstrap.group(wg); clientBootstrap.option(ChannelOption.SO_KEEPALIVE, configuration.isKeepAlive()); clientBootstrap.option(ChannelOption.TCP_NODELAY, configuration.isTcpNoDelay()); http://git-wip-us.apache.org/repos/asf/camel/blob/e8e6cc4a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java index c181ebf..a6bce2d 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java @@ -30,6 +30,8 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollDatagramChannel; +import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; @@ -93,7 +95,9 @@ public class NettyProducer extends DefaultAsyncProducer { super.doStart(); if (configuration.getWorkerGroup() == null) { // create new pool which we should shutdown when stopping as its not shared - workerGroup = new NettyWorkerPoolBuilder().withWorkerCount(configuration.getWorkerCount()) + workerGroup = new NettyWorkerPoolBuilder() + .withNativeTransport(configuration.isNativeTransport()) + .withWorkerCount(configuration.getWorkerCount()) .withName("NettyClientTCPWorker").build(); } @@ -391,7 +395,11 @@ public class NettyProducer extends DefaultAsyncProducer { if (isTcp()) { // its okay to create a new bootstrap for each new channel Bootstrap clientBootstrap = new Bootstrap(); - clientBootstrap.channel(NioSocketChannel.class); + if (configuration.isNativeTransport()) { + clientBootstrap.channel(EpollSocketChannel.class); + } else { + clientBootstrap.channel(NioSocketChannel.class); + } clientBootstrap.group(getWorkerGroup()); clientBootstrap.option(ChannelOption.SO_KEEPALIVE, configuration.isKeepAlive()); clientBootstrap.option(ChannelOption.TCP_NODELAY, configuration.isTcpNoDelay()); @@ -418,7 +426,11 @@ public class NettyProducer extends DefaultAsyncProducer { } else { // its okay to create a new bootstrap for each new channel Bootstrap connectionlessClientBootstrap = new Bootstrap(); - connectionlessClientBootstrap.channel(NioDatagramChannel.class); + if (configuration.isNativeTransport()) { + connectionlessClientBootstrap.channel(EpollDatagramChannel.class); + } else { + connectionlessClientBootstrap.channel(NioDatagramChannel.class); + } connectionlessClientBootstrap.group(getWorkerGroup()); connectionlessClientBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.getConnectTimeout()); connectionlessClientBootstrap.option(ChannelOption.SO_BROADCAST, configuration.isBroadcast()); http://git-wip-us.apache.org/repos/asf/camel/blob/e8e6cc4a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java index 5db1452..3972da5 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java @@ -93,6 +93,8 @@ public class NettyServerBootstrapConfiguration implements Cloneable { protected String enabledProtocols = DEFAULT_ENABLED_PROTOCOLS; @UriParam(label = "security") protected String passphrase; + @UriParam(label = "advanced") + protected boolean nativeTransport; @UriParam(label = "consumer,advanced") protected EventLoopGroup bossGroup; @UriParam(label = "consumer,advanced") @@ -465,6 +467,18 @@ public class NettyServerBootstrapConfiguration implements Cloneable { this.options = options; } + public boolean isNativeTransport() { + return nativeTransport; + } + + /** + * Whether to use native transport instead of NIO. Native transport takes advantage of the host operating system and is only supported on some platforms. + * You need to add the netty JAR for the host operating system you are using. See more details at: http://netty.io/wiki/native-transports.html + */ + public void setNativeTransport(boolean nativeTransport) { + this.nativeTransport = nativeTransport; + } + public EventLoopGroup getBossGroup() { return bossGroup; } @@ -475,7 +489,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable { public void setBossGroup(EventLoopGroup bossGroup) { this.bossGroup = bossGroup; } - + public EventLoopGroup getWorkerGroup() { return workerGroup; } http://git-wip-us.apache.org/repos/asf/camel/blob/e8e6cc4a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java index 4f1611e..75d3d13 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java @@ -17,6 +17,7 @@ package org.apache.camel.component.netty4; import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import org.apache.camel.util.concurrent.CamelThreadFactory; @@ -30,6 +31,7 @@ public final class NettyServerBossPoolBuilder { private String name = "NettyServerBoss"; private String pattern; private int bossCount = 1; + private boolean nativeTransport; public void setName(String name) { this.name = name; @@ -43,6 +45,10 @@ public final class NettyServerBossPoolBuilder { this.bossCount = bossCount; } + public void setNativeTransport(boolean nativeTransport) { + this.nativeTransport = nativeTransport; + } + public NettyServerBossPoolBuilder withName(String name) { setName(name); return this; @@ -58,10 +64,19 @@ public final class NettyServerBossPoolBuilder { return this; } + public NettyServerBossPoolBuilder withNativeTransport(boolean nativeTransport) { + setNativeTransport(nativeTransport); + return this; + } + /** * Creates a new boss pool. */ public EventLoopGroup build() { - return new NioEventLoopGroup(bossCount, new CamelThreadFactory(pattern, name, false)); + if (nativeTransport) { + return new EpollEventLoopGroup(bossCount, new CamelThreadFactory(pattern, name, false)); + } else { + return new NioEventLoopGroup(bossCount, new CamelThreadFactory(pattern, name, false)); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/e8e6cc4a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java index 125bbce..e667cc6 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java @@ -17,6 +17,7 @@ package org.apache.camel.component.netty4; import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import org.apache.camel.util.concurrent.CamelThreadFactory; @@ -29,6 +30,7 @@ public final class NettyWorkerPoolBuilder { private String name = "NettyWorker"; private String pattern; private int workerCount; + private boolean nativeTransport; private volatile EventLoopGroup workerPool; public void setName(String name) { @@ -43,6 +45,10 @@ public final class NettyWorkerPoolBuilder { this.workerCount = workerCount; } + public void setNativeTransport(boolean nativeTransport) { + this.nativeTransport = nativeTransport; + } + public NettyWorkerPoolBuilder withName(String name) { setName(name); return this; @@ -58,12 +64,21 @@ public final class NettyWorkerPoolBuilder { return this; } + public NettyWorkerPoolBuilder withNativeTransport(boolean nativeTransport) { + setNativeTransport(nativeTransport); + return this; + } + /** * Creates a new worker pool. */ public EventLoopGroup build() { int count = workerCount > 0 ? workerCount : NettyHelper.DEFAULT_IO_THREADS; - workerPool = new NioEventLoopGroup(count, new CamelThreadFactory(pattern, name, false)); + if (nativeTransport) { + workerPool = new EpollEventLoopGroup(count, new CamelThreadFactory(pattern, name, false)); + } else { + workerPool = new NioEventLoopGroup(count, new CamelThreadFactory(pattern, name, false)); + } return workerPool; } http://git-wip-us.apache.org/repos/asf/camel/blob/e8e6cc4a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java index 9184fae..887113f 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java @@ -26,6 +26,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; @@ -141,6 +142,7 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme if (bg == null) { // create new pool which we should shutdown when stopping as its not shared bossGroup = new NettyServerBossPoolBuilder() + .withNativeTransport(configuration.isNativeTransport()) .withBossCount(configuration.getBossCount()) .withName("NettyServerTCPBoss") .build(); @@ -149,6 +151,7 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme if (wg == null) { // create new pool which we should shutdown when stopping as its not shared workerGroup = new NettyWorkerPoolBuilder() + .withNativeTransport(configuration.isNativeTransport()) .withWorkerCount(configuration.getWorkerCount()) .withName("NettyServerTCPWorker") .build(); @@ -156,7 +159,11 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme } serverBootstrap = new ServerBootstrap(); - serverBootstrap.group(bg, wg).channel(NioServerSocketChannel.class); + if (configuration.isNativeTransport()) { + serverBootstrap.group(bg, wg).channel(EpollServerSocketChannel.class); + } else { + serverBootstrap.group(bg, wg).channel(NioServerSocketChannel.class); + } serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, configuration.isKeepAlive()); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, configuration.isTcpNoDelay()); serverBootstrap.option(ChannelOption.SO_REUSEADDR, configuration.isReuseAddress()); http://git-wip-us.apache.org/repos/asf/camel/blob/e8e6cc4a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java index dcaa262..3f68bd6 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java @@ -28,6 +28,7 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.epoll.EpollDatagramChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.socket.DatagramChannel; @@ -121,6 +122,7 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme if (wg == null) { // create new pool which we should shutdown when stopping as its not shared workerGroup = new NettyWorkerPoolBuilder() + .withNativeTransport(configuration.isNativeTransport()) .withWorkerCount(configuration.getWorkerCount()) .withName("NettyServerTCPWorker") .build(); @@ -128,7 +130,11 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme } Bootstrap bootstrap = new Bootstrap(); - bootstrap.group(wg).channel(NioDatagramChannel.class); + if (configuration.isNativeTransport()) { + bootstrap.group(wg).channel(EpollDatagramChannel.class); + } else { + bootstrap.group(wg).channel(NioDatagramChannel.class); + } // We cannot set the child option here bootstrap.option(ChannelOption.SO_REUSEADDR, configuration.isReuseAddress()); bootstrap.option(ChannelOption.SO_SNDBUF, configuration.getSendBufferSize());
