Updated Branches:
  refs/heads/flume-1.4 9efb9f23e -> f49607c4c

FLUME-2083. Avro Source should not start if SSL is enabled and keystore cannot 
be opened

(Mike Percy via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f49607c4
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f49607c4
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f49607c4

Branch: refs/heads/flume-1.4
Commit: f49607c4cf6470a135f661b68c56c971617ca033
Parents: 9efb9f2
Author: Hari Shreedharan <[email protected]>
Authored: Tue Jun 18 16:27:24 2013 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Tue Jun 18 16:29:30 2013 -0700

----------------------------------------------------------------------
 .../org/apache/flume/source/AvroSource.java     | 15 +++++++++---
 .../org/apache/flume/sink/TestAvroSink.java     |  1 +
 .../org/apache/flume/source/TestAvroSource.java |  2 +-
 flume-ng-doc/sphinx/FlumeUserGuide.rst          | 20 ++++++++--------
 .../apache/flume/api/NettyAvroRpcClient.java    | 24 ++++++++++----------
 5 files changed, 37 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/f49607c4/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index edc2574..f23cd93 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -19,6 +19,7 @@
 
 package org.apache.flume.source;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import java.io.FileInputStream;
 import java.net.InetSocketAddress;
@@ -44,6 +45,7 @@ import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
+import org.apache.flume.FlumeException;
 import org.apache.flume.Source;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.Configurables;
@@ -128,6 +130,7 @@ public class AvroSource extends AbstractSource implements 
EventDrivenSource,
   private static final String PORT_KEY = "port";
   private static final String BIND_KEY = "bind";
   private static final String COMPRESSION_TYPE = "compression-type";
+  private static final String SSL_KEY = "ssl";
   private static final String KEYSTORE_KEY = "keystore";
   private static final String KEYSTORE_PASSWORD_KEY = "keystore-password";
   private static final String KEYSTORE_TYPE_KEY = "keystore-type";
@@ -160,16 +163,22 @@ public class AvroSource extends AbstractSource implements 
EventDrivenSource,
               context.getString(THREADS));
     }
 
+    enableSsl = context.getBoolean(SSL_KEY, false);
     keystore = context.getString(KEYSTORE_KEY);
     keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY);
     keystoreType = context.getString(KEYSTORE_TYPE_KEY, "JKS");
-    if (keystore != null && keystorePassword != null) {
+
+    if (enableSsl) {
+      Preconditions.checkNotNull(keystore,
+          KEYSTORE_KEY + " must be specified when SSL is enabled");
+      Preconditions.checkNotNull(keystorePassword,
+          KEYSTORE_PASSWORD_KEY + " must be specified when SSL is enabled");
       try {
         KeyStore ks = KeyStore.getInstance(keystoreType);
         ks.load(new FileInputStream(keystore), keystorePassword.toCharArray());
-        enableSsl = true;
       } catch (Exception ex) {
-        logger.warn("AVRO source configured with invalid keystore " + 
keystore, ex);
+        throw new FlumeException(
+            "Avro source configured with invalid keystore: " + keystore, ex);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/f49607c4/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java 
b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
index 202b882..8760c25 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
@@ -474,6 +474,7 @@ public class TestAvroSink {
     context.put("bind", hostname);
     context.put("threads", "50");
     context.put("compression-type", "deflate");
+    context.put("ssl", String.valueOf(true));
     context.put("keystore", "src/test/resources/server.p12");
     context.put("keystore-password", "password");
     context.put("keystore-type", "PKCS12");

http://git-wip-us.apache.org/repos/asf/flume/blob/f49607c4/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
index 8fd7072..2667a6f 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
@@ -270,7 +270,7 @@ public class TestAvroSource {
 
         context.put("port", String.valueOf(selectedPort = 41414 + i));
         context.put("bind", "0.0.0.0");
-        context.put("threads", "50");
+        context.put("ssl", "true");
         context.put("keystore", "src/test/resources/server.p12");
         context.put("keystore-password", "password");
         context.put("keystore-type", "PKCS12");

http://git-wip-us.apache.org/repos/asf/flume/blob/f49607c4/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index a8f84e5..74863d4 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -684,7 +684,7 @@ Avro Source
 ~~~~~~~~~~~
 
 Listens on Avro port and receives events from external Avro client streams.
-When paired with the built-in AvroSink on another (previous hop) Flume agent,
+When paired with the built-in Avro Sink on another (previous hop) Flume agent,
 it can create tiered collection topologies.
 Required properties are in **bold**.
 
@@ -701,9 +701,10 @@ selector.*
 interceptors         --           Space-separated list of interceptors
 interceptors.*
 compression-type     none         This can be "none" or "deflate".  The 
compression-type must match the compression-type of matching AvroSource
-keystore             --           The path to a Java keystore. If "keystore" 
and "keystore-password" are both set, then this AvroSource will us SSL.
-keystore-password    --           The password for the Java keystore.
-keystore-type        JKS          This can be "JKS" or "PKCS12". The type of 
the Java keystore.
+ssl                  false        Set this to true to enable SSL encryption. 
You must also specify a "keystore" and a "keystore-password".
+keystore             --           This is the path to a Java keystore file. 
Required for SSL.
+keystore-password    --           The password for the Java keystore. Required 
for SSL.
+keystore-type        JKS          The type of the Java keystore. This can be 
"JKS" or "PKCS12".
 ==================   ===========  
===================================================
 
 Example for agent named a1:
@@ -1529,11 +1530,12 @@ connect-timeout              20000    Amount of time 
(ms) to allow for the first
 request-timeout              20000    Amount of time (ms) to allow for 
requests after the first.
 reset-connection-interval    none     Amount of time (s) before the connection 
to the next hop is reset. This will force the Avro Sink to reconnect to the 
next hop. This will allow the sink to connect to hosts behind a hardware 
load-balancer when news hosts are added without having to restart the agent.
 compression-type             none     This can be "none" or "deflate".  The 
compression-type must match the compression-type of matching AvroSource
-compression-level            6       The level of compression to compress 
event. 0 = no compression and 1-9 is compression.  The higher the number the 
more compression
-ssl                   false    Set to true to enable SSL for this AvroSink. 
When configuring SSL, you can optionally set a "truststore", 
"truststore-password", and "truststore-type".
-truststore            --       The path to a Java truststore file. If you 
enable SSL without configuring a truststore, the AvroSink will automatically 
use a permisive trust setting and accept any server certifacte used by the 
AvroSource it is connected to.
-truststore-password   --       The password for the truststore.
-truststore-type       JKS      This can be "JKS" or other supported Java 
truststore type. The type of the Java truststore.
+compression-level            6        The level of compression to compress 
event. 0 = no compression and 1-9 is compression.  The higher the number the 
more compression
+ssl                          false    Set to true to enable SSL for this 
AvroSink. When configuring SSL, you can optionally set a "truststore", 
"truststore-password", "truststore-type", and specify whether to 
"trust-all-certs".
+trust-all-certs              false    If this is set to true, SSL server 
certificates for remote servers (Avro Sources) will not be checked. This should 
NOT be used in production because it makes it easier for an attacker to execute 
a man-in-the-middle attack and "listen in" on the encrypted connection.
+truststore                   --       The path to a custom Java truststore 
file. Flume uses the certificate authority information in this file to 
determine whether the remote Avro Source's SSL authentication credentials 
should be trusted. If not specified, the default Java JSSE certificate 
authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) will 
be used.
+truststore-password          --       The password for the specified 
truststore.
+truststore-type              JKS      The type of the Java truststore. This 
can be "JKS" or other supported Java truststore type.
 ==========================   =======  
==============================================
 
 Example for agent named a1:

http://git-wip-us.apache.org/repos/asf/flume/blob/f49607c4/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java 
b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
index 66be934..9aabdd4 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -69,7 +69,6 @@ import org.jboss.netty.handler.codec.compression.ZlibEncoder;
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import sun.security.validator.KeyStores;
 
 /**
  * Avro/Netty implementation of {@link RpcClient}.
@@ -666,25 +665,26 @@ implements RpcClient {
                 + " all server certificates");
             managers = new TrustManager[] { new PermissiveTrustManager() };
           } else {
-            InputStream truststoreStream = null;
-            if (truststore == null) {
-              truststoreType = "JKS";
-              truststoreStream = 
getClass().getClassLoader().getResourceAsStream("cacerts");
-              truststorePassword = "changeit";
-            } else {
-              truststoreStream = new FileInputStream(truststore);
+            KeyStore keystore = null;
+
+            if (truststore != null) {
+              if (truststorePassword == null) {
+                throw new NullPointerException("truststore password is null");
+              }
+              InputStream truststoreStream = new FileInputStream(truststore);
+              keystore = KeyStore.getInstance(truststoreType);
+              keystore.load(truststoreStream, 
truststorePassword.toCharArray());
             }
-            KeyStore keystore = KeyStore.getInstance(truststoreType);
-            keystore.load(truststoreStream, truststorePassword.toCharArray());
 
             TrustManagerFactory tmf = 
TrustManagerFactory.getInstance("SunX509");
+            // null keystore is OK, with SunX509 it defaults to system CA Certs
+            // see 
http://docs.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#X509TrustManager
             tmf.init(keystore);
             managers = tmf.getTrustManagers();
           }
 
           SSLContext sslContext = SSLContext.getInstance("TLS");
-          sslContext.init(null, managers,
-                          null);
+          sslContext.init(null, managers, null);
           SSLEngine sslEngine = sslContext.createSSLEngine();
           sslEngine.setUseClientMode(true);
           // addFirst() will make SSL handling the first stage of decoding

Reply via email to