Updated Branches: refs/heads/trunk 2ea492202 -> 753e41379
FLUME-2238. Provide option to configure worker threads in NettyAvroRpcClient (Cameron Gandevia via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/753e4137 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/753e4137 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/753e4137 Branch: refs/heads/trunk Commit: 753e4137918b5bdf559dd50a21db2a832aa1dce3 Parents: 2ea4922 Author: Hari Shreedharan <[email protected]> Authored: Mon Dec 9 16:35:33 2013 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Mon Dec 9 16:36:26 2013 -0800 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 35 ++++---- .../apache/flume/api/NettyAvroRpcClient.java | 86 +++++++++++++++----- .../api/RpcClientConfigurationConstants.java | 6 ++ .../flume/api/TestNettyAvroRpcClient.java | 79 ++++++++++++++++-- 4 files changed, 162 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/753e4137/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 8687cb7..0737c44 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1638,25 +1638,26 @@ hostname / port pair. The events are taken from the configured Channel in batches of the configured batch size. Required properties are in **bold**. -========================== ======= ============================================== +========================== ===================================================== =========================================================================================== Property Name Default Description -========================== ======= ============================================== +========================== ===================================================== =========================================================================================== **channel** -- -**type** -- The component type name, needs to be ``avro``. -**hostname** -- The hostname or IP address to bind to. -**port** -- The port # to listen on. -batch-size 100 number of event to batch together for send. -connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request. -request-timeout 20000 Amount of time (ms) to allow for requests after the first. -reset-connection-interval none Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent. -compression-type none This can be "none" or "deflate". The compression-type must match the compression-type of matching AvroSource -compression-level 6 The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression -ssl false Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a "truststore", "truststore-password", "truststore-type", and specify whether to "trust-all-certs". -trust-all-certs false If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and "listen in" on the encrypted connection. -truststore -- The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source's SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) will be used. -truststore-password -- The password for the specified truststore. -truststore-type JKS The type of the Java truststore. This can be "JKS" or other supported Java truststore type. -========================== ======= ============================================== +**type** -- The component type name, needs to be ``avro``. +**hostname** -- The hostname or IP address to bind to. +**port** -- The port # to listen on. +batch-size 100 number of event to batch together for send. +connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request. +request-timeout 20000 Amount of time (ms) to allow for requests after the first. +reset-connection-interval none Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent. +compression-type none This can be "none" or "deflate". The compression-type must match the compression-type of matching AvroSource +compression-level 6 The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression +ssl false Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a "truststore", "truststore-password", "truststore-type", and specify whether to "trust-all-certs". +trust-all-certs false If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and "listen in" on the encrypted connection. +truststore -- The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source's SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) will be used. +truststore-password -- The password for the specified truststore. +truststore-type JKS The type of the Java truststore. This can be "JKS" or other supported Java truststore type. +maxIoWorkers 2 * the number of available processors in the machine The maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory. +========================== ===================================================== =========================================================================================== Example for agent named a1: http://git-wip-us.apache.org/repos/asf/flume/blob/753e4137/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java index 9aabdd4..a2eb264 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java @@ -45,6 +45,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; + import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import javax.net.ssl.TrustManager; @@ -55,6 +56,7 @@ import org.apache.avro.ipc.CallFuture; import org.apache.avro.ipc.NettyTransceiver; import org.apache.avro.ipc.Transceiver; import org.apache.avro.ipc.specific.SpecificRequestor; +import org.apache.commons.lang.StringUtils; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; @@ -99,6 +101,7 @@ implements RpcClient { .getLogger(NettyAvroRpcClient.class); private boolean enableDeflateCompression; private int compressionLevel; + private int maxIoWorkers; /** * This constructor is intended to be called from {@link RpcClientFactory}. @@ -128,20 +131,34 @@ implements RpcClient { try { + ExecutorService bossExecutor = + Executors.newCachedThreadPool(new TransceiverThreadFactory( + "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")); + ExecutorService workerExecutor = + Executors.newCachedThreadPool(new TransceiverThreadFactory( + "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker")); + if (enableDeflateCompression || enableSsl) { - socketChannelFactory = new SSLCompressionChannelFactory( - Executors.newCachedThreadPool(new TransceiverThreadFactory( - "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")), - Executors.newCachedThreadPool(new TransceiverThreadFactory( - "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker")), - enableDeflateCompression, enableSsl, trustAllCerts, compressionLevel, - truststore, truststorePassword, truststoreType); + if (maxIoWorkers >= 1) { + socketChannelFactory = new SSLCompressionChannelFactory( + bossExecutor, workerExecutor, + enableDeflateCompression, enableSsl, trustAllCerts, + compressionLevel, truststore, truststorePassword, truststoreType, + maxIoWorkers); + } else { + socketChannelFactory = new SSLCompressionChannelFactory( + bossExecutor, workerExecutor, + enableDeflateCompression, enableSsl, trustAllCerts, + compressionLevel, truststore, truststorePassword, truststoreType); + } } else { - socketChannelFactory = new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(new TransceiverThreadFactory( - "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")), - Executors.newCachedThreadPool(new TransceiverThreadFactory( - "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"))); + if (maxIoWorkers >= 1) { + socketChannelFactory = new NioClientSocketChannelFactory( + bossExecutor, workerExecutor, maxIoWorkers); + } else { + socketChannelFactory = new NioClientSocketChannelFactory( + bossExecutor, workerExecutor); + } } transceiver = new NettyTransceiver(this.address, @@ -587,6 +604,23 @@ implements RpcClient { truststoreType = properties.getProperty( RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, "JKS"); + String maxIoWorkersStr = properties.getProperty( + RpcClientConfigurationConstants.MAX_IO_WORKERS); + if (!StringUtils.isEmpty(maxIoWorkersStr)) { + try { + maxIoWorkers = Integer.parseInt(maxIoWorkersStr); + } catch (NumberFormatException ex) { + logger.warn ("Invalid maxIOWorkers:" + maxIoWorkersStr + " Using " + + "default maxIOWorkers."); + maxIoWorkers = -1; + } + } + + if (maxIoWorkers < 1) { + logger.warn("Using default maxIOWorkers"); + maxIoWorkers = -1; + } + this.connect(); } @@ -628,13 +662,13 @@ implements RpcClient { */ private static class SSLCompressionChannelFactory extends NioClientSocketChannelFactory { - private boolean enableCompression; - private int compressionLevel; - private boolean enableSsl; - private boolean trustAllCerts; - private String truststore; - private String truststorePassword; - private String truststoreType; + private final boolean enableCompression; + private final int compressionLevel; + private final boolean enableSsl; + private final boolean trustAllCerts; + private final String truststore; + private final String truststorePassword; + private final String truststoreType; public SSLCompressionChannelFactory(Executor bossExecutor, Executor workerExecutor, boolean enableCompression, boolean enableSsl, boolean trustAllCerts, @@ -650,6 +684,20 @@ implements RpcClient { this.truststoreType = truststoreType; } + public SSLCompressionChannelFactory(Executor bossExecutor, Executor workerExecutor, + boolean enableCompression, boolean enableSsl, boolean trustAllCerts, + int compressionLevel, String truststore, String truststorePassword, + String truststoreType, int maxIOWorkers) { + super(bossExecutor, workerExecutor, maxIOWorkers); + this.enableCompression = enableCompression; + this.enableSsl = enableSsl; + this.compressionLevel = compressionLevel; + this.trustAllCerts = trustAllCerts; + this.truststore = truststore; + this.truststorePassword = truststorePassword; + this.truststoreType = truststoreType; + } + @Override public SocketChannel newChannel(ChannelPipeline pipeline) { TrustManager[] managers; http://git-wip-us.apache.org/repos/asf/flume/blob/753e4137/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java index 7aa70cb..136c504 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java @@ -144,6 +144,12 @@ public final class RpcClientConfigurationConstants { public static final String CONFIG_TRUSTSTORE_PASSWORD = "truststore-password"; public static final String CONFIG_TRUSTSTORE_TYPE = "truststore-type"; + /** + * Configuration constants for the NettyAvroRpcClient + * NioClientSocketChannelFactory + */ + public static final String MAX_IO_WORKERS = "maxIoWorkers"; + private RpcClientConfigurationConstants() { // disable explicit object creation } http://git-wip-us.apache.org/repos/asf/flume/blob/753e4137/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java index bfb1fa6..cf4f415 100644 --- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java +++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java @@ -18,28 +18,22 @@ */ package org.apache.flume.api; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.net.InetSocketAddress; -import java.net.ServerSocket; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Properties; -import org.junit.Test; - import org.apache.avro.ipc.Server; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; -import org.apache.flume.event.EventBuilder; - import org.apache.flume.api.RpcTestUtils.FailedAvroHandler; import org.apache.flume.api.RpcTestUtils.OKAvroHandler; import org.apache.flume.api.RpcTestUtils.ThrowingAvroHandler; import org.apache.flume.api.RpcTestUtils.UnknownAvroHandler; +import org.apache.flume.event.EventBuilder; import org.junit.Assert; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -333,4 +327,73 @@ public class TestNettyAvroRpcClient { RpcTestUtils.handlerBatchAppendTest(new ThrowingAvroHandler()); logger.error("Throwing: I should never have gotten here!"); } + + /** + * configure the NettyAvroRpcClient with a non-default + * NioClientSocketChannelFactory number of io worker threads + * + * @throws FlumeException + * @throws EventDeliveryException + */ + @Test + public void testAppendWithMaxIOWorkers() throws FlumeException, EventDeliveryException { + NettyAvroRpcClient client = null; + Server server = RpcTestUtils.startServer(new OKAvroHandler()); + Properties props = new Properties(); + props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "localhost"); + props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "localhost", localhost + + ":" + server.getPort()); + props.setProperty(RpcClientConfigurationConstants.MAX_IO_WORKERS, Integer.toString(2)); + try { + client = new NettyAvroRpcClient(); + client.configure(props); + for (int i = 0; i < 5; i++) { + client.append(EventBuilder.withBody("evt:" + i, Charset.forName("UTF8"))); + } + } finally { + RpcTestUtils.stopServer(server); + if (client != null) { + client.close(); + } + } + } + + /** + * Simple request with compression on the server and client with compression + * level 0 + * + * configure the NettyAvroRpcClient with a non-default + * NioClientSocketChannelFactory number of io worker threads + * + * Compression level 0 = no compression + * + * @throws FlumeException + * @throws EventDeliveryException + */ + @Test + public void testAppendWithMaxIOWorkersSimpleCompressionLevel0() throws FlumeException, + EventDeliveryException { + NettyAvroRpcClient client = null; + Server server = RpcTestUtils.startServer(new OKAvroHandler(), 0, true); + Properties props = new Properties(); + props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "localhost"); + props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "localhost", localhost + + ":" + server.getPort()); + props.setProperty(RpcClientConfigurationConstants.MAX_IO_WORKERS, Integer.toString(2)); + props.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE, "deflate"); + props.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_LEVEL, "" + 0); + + try { + client = new NettyAvroRpcClient(); + client.configure(props); + for (int i = 0; i < 5; i++) { + client.append(EventBuilder.withBody("evt:" + i, Charset.forName("UTF8"))); + } + } finally { + RpcTestUtils.stopServer(server); + if (client != null) { + client.close(); + } + } + } }
