Updated Branches:
  refs/heads/trunk 2ea492202 -> 753e41379

FLUME-2238. Provide option to configure worker threads in NettyAvroRpcClient

(Cameron Gandevia via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/753e4137
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/753e4137
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/753e4137

Branch: refs/heads/trunk
Commit: 753e4137918b5bdf559dd50a21db2a832aa1dce3
Parents: 2ea4922
Author: Hari Shreedharan <[email protected]>
Authored: Mon Dec 9 16:35:33 2013 -0800
Committer: Hari Shreedharan <[email protected]>
Committed: Mon Dec 9 16:36:26 2013 -0800

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          | 35 ++++----
 .../apache/flume/api/NettyAvroRpcClient.java    | 86 +++++++++++++++-----
 .../api/RpcClientConfigurationConstants.java    |  6 ++
 .../flume/api/TestNettyAvroRpcClient.java       | 79 ++++++++++++++++--
 4 files changed, 162 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/753e4137/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 8687cb7..0737c44 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1638,25 +1638,26 @@ hostname / port pair. The events are taken from the 
configured Channel in
 batches of the configured batch size.
 Required properties are in **bold**.
 
-==========================   =======  
==============================================
+==========================   
=====================================================  
===========================================================================================
 Property Name                Default  Description
-==========================   =======  
==============================================
+==========================   
=====================================================  
===========================================================================================
 **channel**                  --
-**type**                     --       The component type name, needs to be 
``avro``.
-**hostname**                 --       The hostname or IP address to bind to.
-**port**                     --       The port # to listen on.
-batch-size                   100      number of event to batch together for 
send.
-connect-timeout              20000    Amount of time (ms) to allow for the 
first (handshake) request.
-request-timeout              20000    Amount of time (ms) to allow for 
requests after the first.
-reset-connection-interval    none     Amount of time (s) before the connection 
to the next hop is reset. This will force the Avro Sink to reconnect to the 
next hop. This will allow the sink to connect to hosts behind a hardware 
load-balancer when news hosts are added without having to restart the agent.
-compression-type             none     This can be "none" or "deflate".  The 
compression-type must match the compression-type of matching AvroSource
-compression-level            6        The level of compression to compress 
event. 0 = no compression and 1-9 is compression.  The higher the number the 
more compression
-ssl                          false    Set to true to enable SSL for this 
AvroSink. When configuring SSL, you can optionally set a "truststore", 
"truststore-password", "truststore-type", and specify whether to 
"trust-all-certs".
-trust-all-certs              false    If this is set to true, SSL server 
certificates for remote servers (Avro Sources) will not be checked. This should 
NOT be used in production because it makes it easier for an attacker to execute 
a man-in-the-middle attack and "listen in" on the encrypted connection.
-truststore                   --       The path to a custom Java truststore 
file. Flume uses the certificate authority information in this file to 
determine whether the remote Avro Source's SSL authentication credentials 
should be trusted. If not specified, the default Java JSSE certificate 
authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) will 
be used.
-truststore-password          --       The password for the specified 
truststore.
-truststore-type              JKS      The type of the Java truststore. This 
can be "JKS" or other supported Java truststore type.
-==========================   =======  
==============================================
+**type**                     --                                                
     The component type name, needs to be ``avro``.
+**hostname**                 --                                                
     The hostname or IP address to bind to.
+**port**                     --                                                
     The port # to listen on.
+batch-size                   100                                               
     number of event to batch together for send.
+connect-timeout              20000                                             
     Amount of time (ms) to allow for the first (handshake) request.
+request-timeout              20000                                             
     Amount of time (ms) to allow for requests after the first.
+reset-connection-interval    none                                              
     Amount of time (s) before the connection to the next hop is reset. This 
will force the Avro Sink to reconnect to the next hop. This will allow the sink 
to connect to hosts behind a hardware load-balancer when news hosts are added 
without having to restart the agent.
+compression-type             none                                              
     This can be "none" or "deflate".  The compression-type must match the 
compression-type of matching AvroSource
+compression-level            6                                                 
     The level of compression to compress event. 0 = no compression and 1-9 is 
compression.  The higher the number the more compression
+ssl                          false                                             
     Set to true to enable SSL for this AvroSink. When configuring SSL, you can 
optionally set a "truststore", "truststore-password", "truststore-type", and 
specify whether to "trust-all-certs".
+trust-all-certs              false                                             
     If this is set to true, SSL server certificates for remote servers (Avro 
Sources) will not be checked. This should NOT be used in production because it 
makes it easier for an attacker to execute a man-in-the-middle attack and 
"listen in" on the encrypted connection.
+truststore                   --                                                
     The path to a custom Java truststore file. Flume uses the certificate 
authority information in this file to determine whether the remote Avro 
Source's SSL authentication credentials should be trusted. If not specified, 
the default Java JSSE certificate authority files (typically "jssecacerts" or 
"cacerts" in the Oracle JRE) will be used.
+truststore-password          --                                                
     The password for the specified truststore.
+truststore-type              JKS                                               
     The type of the Java truststore. This can be "JKS" or other supported Java 
truststore type.
+maxIoWorkers                 2 * the number of available processors in the 
machine  The maximum number of I/O worker threads. This is configured on the 
NettyAvroRpcClient NioClientSocketChannelFactory.
+==========================   
=====================================================  
===========================================================================================
 
 Example for agent named a1:
 

http://git-wip-us.apache.org/repos/asf/flume/blob/753e4137/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java 
b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
index 9aabdd4..a2eb264 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -45,6 +45,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.TrustManager;
@@ -55,6 +56,7 @@ import org.apache.avro.ipc.CallFuture;
 import org.apache.avro.ipc.NettyTransceiver;
 import org.apache.avro.ipc.Transceiver;
 import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
@@ -99,6 +101,7 @@ implements RpcClient {
       .getLogger(NettyAvroRpcClient.class);
   private boolean enableDeflateCompression;
   private int compressionLevel;
+  private int maxIoWorkers;
 
   /**
    * This constructor is intended to be called from {@link RpcClientFactory}.
@@ -128,20 +131,34 @@ implements RpcClient {
 
     try {
 
+      ExecutorService bossExecutor =
+        Executors.newCachedThreadPool(new TransceiverThreadFactory(
+          "Avro " + NettyTransceiver.class.getSimpleName() + " Boss"));
+      ExecutorService workerExecutor =
+        Executors.newCachedThreadPool(new TransceiverThreadFactory(
+          "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"));
+
       if (enableDeflateCompression || enableSsl) {
-        socketChannelFactory = new SSLCompressionChannelFactory(
-            Executors.newCachedThreadPool(new TransceiverThreadFactory(
-                "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
-            Executors.newCachedThreadPool(new TransceiverThreadFactory(
-                "Avro " + NettyTransceiver.class.getSimpleName() + " I/O 
Worker")),
-            enableDeflateCompression, enableSsl, trustAllCerts, 
compressionLevel,
-            truststore, truststorePassword, truststoreType);
+        if (maxIoWorkers >= 1) {
+          socketChannelFactory = new SSLCompressionChannelFactory(
+            bossExecutor, workerExecutor,
+            enableDeflateCompression, enableSsl, trustAllCerts,
+            compressionLevel, truststore, truststorePassword, truststoreType,
+            maxIoWorkers);
+        } else {
+          socketChannelFactory = new SSLCompressionChannelFactory(
+            bossExecutor, workerExecutor,
+            enableDeflateCompression, enableSsl, trustAllCerts,
+            compressionLevel, truststore, truststorePassword, truststoreType);
+        }
       } else {
-        socketChannelFactory = new NioClientSocketChannelFactory(
-            Executors.newCachedThreadPool(new TransceiverThreadFactory(
-                "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
-            Executors.newCachedThreadPool(new TransceiverThreadFactory(
-                "Avro " + NettyTransceiver.class.getSimpleName() + " I/O 
Worker")));
+        if (maxIoWorkers >= 1) {
+          socketChannelFactory = new NioClientSocketChannelFactory(
+              bossExecutor, workerExecutor, maxIoWorkers);
+        } else {
+          socketChannelFactory = new NioClientSocketChannelFactory(
+              bossExecutor, workerExecutor);
+        }
       }
 
       transceiver = new NettyTransceiver(this.address,
@@ -587,6 +604,23 @@ implements RpcClient {
     truststoreType = properties.getProperty(
         RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, "JKS");
 
+    String maxIoWorkersStr = properties.getProperty(
+      RpcClientConfigurationConstants.MAX_IO_WORKERS);
+    if (!StringUtils.isEmpty(maxIoWorkersStr)) {
+      try {
+        maxIoWorkers = Integer.parseInt(maxIoWorkersStr);
+      } catch (NumberFormatException ex) {
+        logger.warn ("Invalid maxIOWorkers:" + maxIoWorkersStr + " Using " +
+          "default maxIOWorkers.");
+        maxIoWorkers = -1;
+      }
+    }
+
+    if (maxIoWorkers < 1) {
+      logger.warn("Using default maxIOWorkers");
+      maxIoWorkers = -1;
+    }
+
     this.connect();
   }
 
@@ -628,13 +662,13 @@ implements RpcClient {
    */
   private static class SSLCompressionChannelFactory extends 
NioClientSocketChannelFactory {
 
-    private boolean enableCompression;
-    private int compressionLevel;
-    private boolean enableSsl;
-    private boolean trustAllCerts;
-    private String truststore;
-    private String truststorePassword;
-    private String truststoreType;
+    private final boolean enableCompression;
+    private final int compressionLevel;
+    private final boolean enableSsl;
+    private final boolean trustAllCerts;
+    private final String truststore;
+    private final String truststorePassword;
+    private final String truststoreType;
 
     public SSLCompressionChannelFactory(Executor bossExecutor, Executor 
workerExecutor,
         boolean enableCompression, boolean enableSsl, boolean trustAllCerts,
@@ -650,6 +684,20 @@ implements RpcClient {
       this.truststoreType = truststoreType;
     }
 
+    public SSLCompressionChannelFactory(Executor bossExecutor, Executor 
workerExecutor,
+        boolean enableCompression, boolean enableSsl, boolean trustAllCerts,
+        int compressionLevel, String truststore, String truststorePassword,
+        String truststoreType, int maxIOWorkers) {
+      super(bossExecutor, workerExecutor, maxIOWorkers);
+      this.enableCompression = enableCompression;
+      this.enableSsl = enableSsl;
+      this.compressionLevel = compressionLevel;
+      this.trustAllCerts = trustAllCerts;
+      this.truststore = truststore;
+      this.truststorePassword = truststorePassword;
+      this.truststoreType = truststoreType;
+    }
+
     @Override
     public SocketChannel newChannel(ChannelPipeline pipeline) {
       TrustManager[] managers;

http://git-wip-us.apache.org/repos/asf/flume/blob/753e4137/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
 
b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
index 7aa70cb..136c504 100644
--- 
a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
+++ 
b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
@@ -144,6 +144,12 @@ public final class RpcClientConfigurationConstants {
   public static final String CONFIG_TRUSTSTORE_PASSWORD = 
"truststore-password";
   public static final String CONFIG_TRUSTSTORE_TYPE = "truststore-type";
 
+  /**
+   * Configuration constants for the NettyAvroRpcClient
+   * NioClientSocketChannelFactory
+   */
+  public static final String MAX_IO_WORKERS = "maxIoWorkers";
+
   private RpcClientConfigurationConstants() {
     // disable explicit object creation
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/753e4137/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java 
b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
index bfb1fa6..cf4f415 100644
--- 
a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
+++ 
b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
@@ -18,28 +18,22 @@
  */
 package org.apache.flume.api;
 
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
-import org.junit.Test;
-
 import org.apache.avro.ipc.Server;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
-import org.apache.flume.event.EventBuilder;
-
 import org.apache.flume.api.RpcTestUtils.FailedAvroHandler;
 import org.apache.flume.api.RpcTestUtils.OKAvroHandler;
 import org.apache.flume.api.RpcTestUtils.ThrowingAvroHandler;
 import org.apache.flume.api.RpcTestUtils.UnknownAvroHandler;
+import org.apache.flume.event.EventBuilder;
 import org.junit.Assert;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -333,4 +327,73 @@ public class TestNettyAvroRpcClient {
     RpcTestUtils.handlerBatchAppendTest(new ThrowingAvroHandler());
     logger.error("Throwing: I should never have gotten here!");
   }
+
+  /**
+   * configure the NettyAvroRpcClient with a non-default
+   * NioClientSocketChannelFactory number of io worker threads
+   *
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testAppendWithMaxIOWorkers() throws FlumeException, 
EventDeliveryException {
+    NettyAvroRpcClient client = null;
+    Server server = RpcTestUtils.startServer(new OKAvroHandler());
+    Properties props = new Properties();
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, 
"localhost");
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + 
"localhost", localhost
+        + ":" + server.getPort());
+    props.setProperty(RpcClientConfigurationConstants.MAX_IO_WORKERS, 
Integer.toString(2));
+    try {
+      client = new NettyAvroRpcClient();
+      client.configure(props);
+      for (int i = 0; i < 5; i++) {
+        client.append(EventBuilder.withBody("evt:" + i, 
Charset.forName("UTF8")));
+      }
+    } finally {
+      RpcTestUtils.stopServer(server);
+      if (client != null) {
+        client.close();
+      }
+    }
+  }
+
+  /**
+   * Simple request with compression on the server and client with compression
+   * level 0
+   *
+   * configure the NettyAvroRpcClient with a non-default
+   * NioClientSocketChannelFactory number of io worker threads
+   *
+   * Compression level 0 = no compression
+   *
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testAppendWithMaxIOWorkersSimpleCompressionLevel0() throws 
FlumeException,
+      EventDeliveryException {
+    NettyAvroRpcClient client = null;
+    Server server = RpcTestUtils.startServer(new OKAvroHandler(), 0, true);
+    Properties props = new Properties();
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, 
"localhost");
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + 
"localhost", localhost
+        + ":" + server.getPort());
+    props.setProperty(RpcClientConfigurationConstants.MAX_IO_WORKERS, 
Integer.toString(2));
+    props.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE, 
"deflate");
+    
props.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_LEVEL, "" 
+ 0);
+
+    try {
+      client = new NettyAvroRpcClient();
+      client.configure(props);
+      for (int i = 0; i < 5; i++) {
+        client.append(EventBuilder.withBody("evt:" + i, 
Charset.forName("UTF8")));
+      }
+    } finally {
+      RpcTestUtils.stopServer(server);
+      if (client != null) {
+        client.close();
+      }
+    }
+  }
 }

Reply via email to