Repository: flume
Updated Branches:
  refs/heads/trunk 585c4c92e -> 2fd0d2572


FLUME-2971. Add secure Kafka Sink/Source/Channel setup to the User Guide

The User Guide already has details about configuring Kafka channel to work with 
a kerberized Kafka cluster.

This patch adds similar description for Kafka Sink and Kafka Source.

Reviewers: Tristan Stevens, Mike Percy, Bessenyei Balázs Donát

(Attila Simon via Bessenyei Balázs Donát)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2fd0d257
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2fd0d257
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2fd0d257

Branch: refs/heads/trunk
Commit: 2fd0d2572ceb2bc0138c880a3b763647349b64f4
Parents: 585c4c9
Author: Attila Simon <[email protected]>
Authored: Mon Oct 10 20:48:06 2016 +0200
Committer: Bessenyei Balázs Donát <[email protected]>
Committed: Mon Oct 10 20:48:06 2016 +0200

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst | 563 +++++++++++++++++++++++-----
 1 file changed, 471 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/2fd0d257/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 25db777..71fd32e 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1240,44 +1240,45 @@ Kafka Source is an Apache Kafka consumer that reads 
messages from Kafka topics.
 If you have multiple Kafka sources running, you can configure them with the 
same Consumer Group
 so each will read a unique set of partitions for the topics.
 
-
-
-===============================  ===========  
===================================================
-Property Name                    Default      Description
-===============================  ===========  
===================================================
-**channels**                     --
-**type**                         --           The component type name, needs 
to be ``org.apache.flume.source.kafka.KafkaSource``
-**kafka.bootstrap.servers**      --           List of brokers in the Kafka 
cluster used by the source
-kafka.consumer.group.id          flume        Unique identified of consumer 
group. Setting the same id in multiple sources or agents
-                                              indicates that they are part of 
the same consumer group
-**kafka.topics**                 --           Comma-separated list of topics 
the kafka consumer will read messages from.
-**kafka.topics.regex**           --           Regex that defines set of topics 
the source is subscribed on. This property has higher priority
-                                              than ``kafka.topics`` and 
overrides ``kafka.topics`` if exists.
-batchSize                        1000         Maximum number of messages 
written to Channel in one batch
-batchDurationMillis              1000         Maximum time (in ms) before a 
batch will be written to Channel
-                                              The batch will be written 
whenever the first of size and time will be reached.
-backoffSleepIncrement            1000         Initial and incremental wait 
time that is triggered when a Kafka Topic appears to be empty.
-                                              Wait period will reduce 
aggressive pinging of an empty Kafka Topic.  One second is ideal for
-                                              ingestion use cases but a lower 
value may be required for low latency operations with
-                                              interceptors.
-maxBackoffSleep                  5000         Maximum wait time that is 
triggered when a Kafka Topic appears to be empty.  Five seconds is
-                                              ideal for ingestion use cases 
but a lower value may be required for low latency operations
-                                              with interceptors.
-useFlumeEventFormat              false        By default events are taken as 
bytes from the Kafka topic directly into the event body. Set to
-                                              true to read events as the Flume 
Avro binary format. Used in conjunction with the same property
-                                              on the KafkaSink or with the 
parseAsFlumeEvent property on the Kafka Channel this will preserve
-                                              any Flume headers sent on the 
producing side.
-migrateZookeeperOffsets          true         When no Kafka stored offset is 
found, look up the offsets in Zookeeper and commit them to Kafka.
-                                              This should be true to support 
seamless Kafka client migration from older versions of Flume.
-                                              Once migrated this can be set to 
false, though that should generally not be required.
-                                              If no Zookeeper offset is found, 
the Kafka configuration kafka.consumer.auto.offset.reset
-                                              defines how offsets are handled.
-Other Kafka Consumer Properties  --           These properties are used to 
configure the Kafka Consumer. Any producer property supported
-                                              by Kafka can be used. The only 
requirement is to prepend the property name with the prefix
-                                              ``kafka.consumer``.
-                                              For example: 
``kafka.consumer.auto.offset.reset``
-                                              Check `Kafka documentation 
<http://kafka.apache.org/documentation.html#newconsumerconfigs>`_ for details
-===============================  ===========  
===================================================
+==================================  ===========  
===================================================
+Property Name                       Default      Description
+==================================  ===========  
===================================================
+**channels**                        --
+**type**                            --           The component type name, 
needs to be ``org.apache.flume.source.kafka.KafkaSource``
+**kafka.bootstrap.servers**         --           List of brokers in the Kafka 
cluster used by the source
+kafka.consumer.group.id             flume        Unique identified of consumer 
group. Setting the same id in multiple sources or agents
+                                                 indicates that they are part 
of the same consumer group
+**kafka.topics**                    --           Comma-separated list of 
topics the kafka consumer will read messages from.
+**kafka.topics.regex**              --           Regex that defines set of 
topics the source is subscribed on. This property has higher priority
+                                                 than ``kafka.topics`` and 
overrides ``kafka.topics`` if exists.
+batchSize                           1000         Maximum number of messages 
written to Channel in one batch
+batchDurationMillis                 1000         Maximum time (in ms) before a 
batch will be written to Channel
+                                                 The batch will be written 
whenever the first of size and time will be reached.
+backoffSleepIncrement               1000         Initial and incremental wait 
time that is triggered when a Kafka Topic appears to be empty.
+                                                 Wait period will reduce 
aggressive pinging of an empty Kafka Topic.  One second is ideal for
+                                                 ingestion use cases but a 
lower value may be required for low latency operations with
+                                                 interceptors.
+maxBackoffSleep                     5000         Maximum wait time that is 
triggered when a Kafka Topic appears to be empty.  Five seconds is
+                                                 ideal for ingestion use cases 
but a lower value may be required for low latency operations
+                                                 with interceptors.
+useFlumeEventFormat                 false        By default events are taken 
as bytes from the Kafka topic directly into the event body. Set to
+                                                 true to read events as the 
Flume Avro binary format. Used in conjunction with the same property
+                                                 on the KafkaSink or with the 
parseAsFlumeEvent property on the Kafka Channel this will preserve
+                                                 any Flume headers sent on the 
producing side.
+migrateZookeeperOffsets             true         When no Kafka stored offset 
is found, look up the offsets in Zookeeper and commit them to Kafka.
+                                                 This should be true to 
support seamless Kafka client migration from older versions of Flume.
+                                                 Once migrated this can be set 
to false, though that should generally not be required.
+                                                 If no Zookeeper offset is 
found, the Kafka configuration kafka.consumer.auto.offset.reset
+                                                 defines how offsets are 
handled.
+                                                 Check `Kafka documentation 
<http://kafka.apache.org/documentation.html#newconsumerconfigs>`_ for details
+kafka.consumer.security.protocol    PLAINTEXT    Set to SASL_PLAINTEXT, 
SASL_SSL or SSL if writing to Kafka using some level of security. See below for 
additional info on secure setup.
+*more consumer security props*                   If using SASL_PLAINTEXT, 
SASL_SSL or SSL refer to `Kafka security 
<http://kafka.apache.org/documentation.html#security>`_ for additional
+                                                 properties that need to be 
set on consumer.
+Other Kafka Consumer Properties     --           These properties are used to 
configure the Kafka Consumer. Any consumer property supported
+                                                 by Kafka can be used. The 
only requirement is to prepend the property name with the prefix
+                                                 ``kafka.consumer``.
+                                                 For example: 
``kafka.consumer.auto.offset.reset``
+==================================  ===========  
===================================================
 
 .. note:: The Kafka Source overrides two Kafka consumer parameters:
           auto.commit.enable is set to "false" by the source and every batch 
is committed. Kafka source guarantees at least once
@@ -1319,6 +1320,142 @@ Example for topic subscription by regex
     # the default kafka.consumer.group.id=flume is used
 
 
+**Security and Kafka Source:**
+
+Secure authentication as well as data encryption is supported on the 
communication channel between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the 
parameter is named SSL, the actual protocol is a TLS implementation) can be 
used from Kafka version 0.9.0.
+
+As of now data encryption is solely provided by SSL/TLS.
+
+Setting ``kafka.consumer.security.protocol`` to any of the following value 
means:
+
+- **SASL_PLAINTEXT** - Kerberos or plaintext authentication with no data 
encryption
+- **SASL_SSL** - Kerberos or plaintext authentication with data encryption
+- **SSL** - TLS based encryption with optional authentication.
+
+.. warning::
+    There is a performance degradation when SSL is enabled,
+    the magnitude of which depends on the CPU type and the JVM implementation.
+    Reference: `Kafka security overview 
<http://kafka.apache.org/documentation#security_overview>`_
+    and the jira for tracking this issue:
+    `KAFKA-2561 <https://issues.apache.org/jira/browse/KAFKA-2561>`_
+
+
+**TLS and Kafka Source:**
+
+Please read the steps described in `Configuring Kafka Clients SSL 
<http://kafka.apache.org/documentation#security_configclients>`_
+to learn about additional configuration settings for fine tuning for example 
any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore 
types.
+
+Example configuration with server side authentication and data encryption.
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
+    a1.channels.channel1.kafka.consumer.security.protocol = SSL
+    
a1.channels.channel1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
+    a1.channels.channel1.kafka.consumer.ssl.truststore.password=<password to 
access the truststore>
+
+
+Note: By default the property ``ssl.endpoint.identification.algorithm``
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties
+
+.. code-block:: properties
+
+    
a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS
+
+Once enabled, clients will verify the server's fully qualified domain name 
(FQDN)
+against one of the following two fields:
+
+#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
+#) Subject Alternative Name (SAN) 
https://tools.ietf.org/html/rfc5280#section-4.2.1.6
+
+If client side authentication is also required then additionally the following 
should be added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by 
Kafka brokers either
+individually or by their signature chain. Common example is to sign each 
client certificate by a single Root CA
+which in turn is trusted by Kafka brokers.
+
+.. code-block:: properties
+
+    
a1.channels.channel1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
+    a1.channels.channel1.kafka.consumer.ssl.keystore.password=<password to 
access the keystore>
+
+If keystore and key use different password protection then 
``ssl.key.password`` property will
+provide the required additional secret for both consumer keystores:
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.consumer.ssl.key.password=<password to access 
the key>
+
+
+**Kerberos and Kafka Source:**
+
+To use Kafka source with a Kafka cluster secured with Kerberos, set the 
``consumer.security.protocol`` properties noted above for consumer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified 
in a JAAS file's "KafkaClient" section. "Client" section describes the 
Zookeeper connection if needed.
+See `Kafka doc 
<http://kafka.apache.org/documentation.html#security_sasl_clientconfig>`_
+for information on the JAAS file contents. The location of this JAAS file and 
optionally the system wide kerberos configuration can be specified via 
JAVA_OPTS in flume-env.sh:
+
+.. code-block:: properties
+
+    JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
+    JAVA_OPTS="$JAVA_OPTS 
-Djava.security.auth.login.config=/path/to/flume_jaas.conf"
+
+Example secure configuration using SASL_PLAINTEXT:
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
+    a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT
+    a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
+
+Example secure configuration using SASL_SSL:
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
+    a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
+    a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
+    
a1.channels.channel1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
+    a1.channels.channel1.kafka.consumer.ssl.truststore.password=<password to 
access the truststore>
+
+
+Sample JAAS file. For reference of its content please see client config 
sections of the desired authentication mechanism (GSSAPI/PLAIN)
+in Kafka documentation of `SASL configuration 
<http://kafka.apache.org/documentation#security_sasl_clientconfig>`_.
+Since the Kafka Source may also connect to Zookeeper for offset migration, the 
"Client" section was also added to this example.
+This won't be needed unless you require offset migration, or you require this 
section for other secure components.
+Also please make sure that the operating system user of the Flume processes 
has read privileges on the jaas and keytab files.
+
+.. code-block:: javascript
+
+    Client {
+      com.sun.security.auth.module.Krb5LoginModule required
+      useKeyTab=true
+      storeKey=true
+      keyTab="/path/to/keytabs/flume.keytab"
+      principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
+    };
+
+    KafkaClient {
+      com.sun.security.auth.module.Krb5LoginModule required
+      useKeyTab=true
+      storeKey=true
+      keyTab="/path/to/keytabs/flume.keytab"
+      principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
+    };
+
+
 NetCat Source
 ~~~~~~~~~~~~~
 
@@ -2564,38 +2701,41 @@ This version of Flume no longer supports Older Versions 
(0.8.x) of Kafka.
 Required properties are marked in bold font.
 
 
-===============================  ===================  
=============================================================================================
-Property Name                    Default              Description
-===============================  ===================  
=============================================================================================
-**type**                         --                   Must be set to 
``org.apache.flume.sink.kafka.KafkaSink``
-**kafka.bootstrap.servers**      --                   List of brokers 
Kafka-Sink will connect to, to get the list of topic partitions
-                                                      This can be a partial 
list of brokers, but we recommend at least two for HA.
-                                                      The format is comma 
separated list of hostname:port
-kafka.topic                      default-flume-topic  The topic in Kafka to 
which the messages will be published. If this parameter is configured,
-                                                      messages will be 
published to this topic.
-                                                      If the event header 
contains a "topic" field, the event will be published to that topic
-                                                      overriding the topic 
configured here.
-flumeBatchSize                   100                  How many messages to 
process in one batch. Larger batches improve throughput while adding latency.
-kafka.producer.acks              1                    How many replicas must 
acknowledge a message before its considered successfully written.
-                                                      Accepted values are 0 
(Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all 
replicas)
-                                                      Set this to -1 to avoid 
data loss in some cases of leader failure.
-useFlumeEventFormat              false                By default events are 
put as bytes onto the Kafka topic directly from the event body. Set to
-                                                      true to store events as 
the Flume Avro binary format. Used in conjunction with the same property
-                                                      on the KafkaSource or 
with the parseAsFlumeEvent property on the Kafka Channel this will preserve
-                                                      any Flume headers for 
the producing side.
-defaultPartitionId               --                   Specifies a Kafka 
partition ID (integer) for all events in this channel to be sent to, unless
-                                                      overriden by 
``partitionIdHeader``. By default, if this property is not set, events will be
-                                                      distributed by the Kafka 
Producer's partitioner - including by ``key`` if specified (or by a
-                                                      partitioner specified by 
``kafka.partitioner.class``).
-partitionIdHeader                --                   When set, the sink will 
take the value of the field named using the value of this property
-                                                      from the event header 
and send the message to the specified partition of the topic. If the
-                                                      value represents an 
invalid partition, an EventDeliveryException will be thrown. If the header value
-                                                      is present then this 
setting overrides ``defaultPartitionId``.
-Other Kafka Producer Properties  --                   These properties are 
used to configure the Kafka Producer. Any producer property supported
-                                                      by Kafka can be used. 
The only requirement is to prepend the property name with the prefix
-                                                      ``kafka.producer``.
-                                                      For example: 
kafka.producer.linger.ms
-===============================  ===================  
=============================================================================================
+==================================  ===================  
=============================================================================================
+Property Name                       Default              Description
+==================================  ===================  
=============================================================================================
+**type**                            --                   Must be set to 
``org.apache.flume.sink.kafka.KafkaSink``
+**kafka.bootstrap.servers**         --                   List of brokers 
Kafka-Sink will connect to, to get the list of topic partitions
+                                                         This can be a partial 
list of brokers, but we recommend at least two for HA.
+                                                         The format is comma 
separated list of hostname:port
+kafka.topic                         default-flume-topic  The topic in Kafka to 
which the messages will be published. If this parameter is configured,
+                                                         messages will be 
published to this topic.
+                                                         If the event header 
contains a "topic" field, the event will be published to that topic
+                                                         overriding the topic 
configured here.
+flumeBatchSize                      100                  How many messages to 
process in one batch. Larger batches improve throughput while adding latency.
+kafka.producer.acks                 1                    How many replicas 
must acknowledge a message before its considered successfully written.
+                                                         Accepted values are 0 
(Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all 
replicas)
+                                                         Set this to -1 to 
avoid data loss in some cases of leader failure.
+useFlumeEventFormat                 false                By default events are 
put as bytes onto the Kafka topic directly from the event body. Set to
+                                                         true to store events 
as the Flume Avro binary format. Used in conjunction with the same property
+                                                         on the KafkaSource or 
with the parseAsFlumeEvent property on the Kafka Channel this will preserve
+                                                         any Flume headers for 
the producing side.
+defaultPartitionId                  --                   Specifies a Kafka 
partition ID (integer) for all events in this channel to be sent to, unless
+                                                         overriden by 
``partitionIdHeader``. By default, if this property is not set, events will be
+                                                         distributed by the 
Kafka Producer's partitioner - including by ``key`` if specified (or by a
+                                                         partitioner specified 
by ``kafka.partitioner.class``).
+partitionIdHeader                   --                   When set, the sink 
will take the value of the field named using the value of this property
+                                                         from the event header 
and send the message to the specified partition of the topic. If the
+                                                         value represents an 
invalid partition, an EventDeliveryException will be thrown. If the header value
+                                                         is present then this 
setting overrides ``defaultPartitionId``.
+kafka.producer.security.protocol    PLAINTEXT            Set to 
SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of 
security. See below for additional info on secure setup.
+*more producer security props*                           If using 
SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security 
<http://kafka.apache.org/documentation.html#security>`_ for additional
+                                                         properties that need 
to be set on producer.
+Other Kafka Producer Properties     --                   These properties are 
used to configure the Kafka Producer. Any producer property supported
+                                                         by Kafka can be used. 
The only requirement is to prepend the property name with the prefix
+                                                         ``kafka.producer``.
+                                                         For example: 
kafka.producer.linger.ms
+==================================  ===================  
=============================================================================================
 
 .. note::   Kafka Sink uses the ``topic`` and ``key`` properties from the 
FlumeEvent headers to send events to Kafka.
             If ``topic`` exists in the headers, the event will be sent to that 
specific topic, overriding the topic configured for the Sink.
@@ -2635,6 +2775,132 @@ argument.
     a1.sinks.k1.kafka.producer.linger.ms = 1
     a1.sinks.ki.kafka.producer.compression.type = snappy
 
+
+**Security and Kafka Sink:**
+
+Secure authentication as well as data encryption is supported on the 
communication channel between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the 
parameter is named SSL, the actual protocol is a TLS implementation) can be 
used from Kafka version 0.9.0.
+
+As of now data encryption is solely provided by SSL/TLS.
+
+Setting ``kafka.producer.security.protocol`` to any of the following value 
means:
+
+- **SASL_PLAINTEXT** - Kerberos or plaintext authentication with no data 
encryption
+- **SASL_SSL** - Kerberos or plaintext authentication with data encryption
+- **SSL** - TLS based encryption with optional authentication.
+
+.. warning::
+    There is a performance degradation when SSL is enabled,
+    the magnitude of which depends on the CPU type and the JVM implementation.
+    Reference: `Kafka security overview 
<http://kafka.apache.org/documentation#security_overview>`_
+    and the jira for tracking this issue:
+    `KAFKA-2561 <https://issues.apache.org/jira/browse/KAFKA-2561>`__
+
+
+**TLS and Kafka Sink:**
+
+Please read the steps described in `Configuring Kafka Clients SSL 
<http://kafka.apache.org/documentation#security_configclients>`_
+to learn about additional configuration settings for fine tuning for example 
any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore 
types.
+
+Example configuration with server side authentication and data encryption.
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.producer.security.protocol = SSL
+    a1.channels.channel1.kafka.producer.ssl.truststore.location = 
/path/to/truststore.jks
+    a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to 
access the truststore>
+
+
+Note: By default the property ``ssl.endpoint.identification.algorithm``
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm 
= HTTPS
+
+Once enabled, clients will verify the server's fully qualified domain name 
(FQDN)
+against one of the following two fields:
+
+#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
+#) Subject Alternative Name (SAN) 
https://tools.ietf.org/html/rfc5280#section-4.2.1.6
+
+If client side authentication is also required then additionally the following 
should be added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by 
Kafka brokers either
+individually or by their signature chain. Common example is to sign each 
client certificate by a single Root CA
+which in turn is trusted by Kafka brokers.
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.keystore.location = 
/path/to/client.keystore.jks
+    a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to 
access the keystore>
+
+If keystore and key use different password protection then 
``ssl.key.password`` property will
+provide the required additional secret for producer keystore:
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.key.password = <password to access 
the key>
+
+
+**Kerberos and Kafka Sink:**
+
+To use Kafka sink with a Kafka cluster secured with Kerberos, set the 
``producer.security.protocol`` property noted above for producer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified 
in a JAAS file's "KafkaClient" section. "Client" section describes the 
Zookeeper connection if needed.
+See `Kafka doc 
<http://kafka.apache.org/documentation.html#security_sasl_clientconfig>`_
+for information on the JAAS file contents. The location of this JAAS file and 
optionally the system wide kerberos configuration can be specified via 
JAVA_OPTS in flume-env.sh:
+
+.. code-block:: properties
+
+    JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
+    JAVA_OPTS="$JAVA_OPTS 
-Djava.security.auth.login.config=/path/to/flume_jaas.conf"
+
+Example secure configuration using SASL_PLAINTEXT:
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT
+    a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
+
+
+Example secure configuration using SASL_SSL:
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL
+    a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
+    a1.channels.channel1.kafka.producer.ssl.truststore.location = 
/path/to/truststore.jks
+    a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to 
access the truststore>
+
+
+Sample JAAS file. For reference of its content please see client config 
sections of the desired authentication mechanism (GSSAPI/PLAIN)
+in Kafka documentation of `SASL configuration 
<http://kafka.apache.org/documentation#security_sasl_clientconfig>`_.
+Unlike the Kafka Source or Kafka Channel a "Client" section is not required, 
unless it is needed by other connecting components. Also please make sure
+that the operating system user of the Flume processes has read privileges on 
the jaas and keytab files.
+
+.. code-block:: javascript
+
+    KafkaClient {
+      com.sun.security.auth.module.Krb5LoginModule required
+      useKeyTab=true
+      storeKey=true
+      keyTab="/path/to/keytabs/flume.keytab"
+      principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
+    };
+
+
 Custom Sink
 ~~~~~~~~~~~
 
@@ -2792,7 +3058,7 @@ migrateZookeeperOffsets                  true             
           When no Kaf
                                                                      This 
should be true to support seamless Kafka client migration from older versions 
of Flume. Once migrated this can be set
                                                                      to false, 
though that should generally not be required. If no Zookeeper offset is found 
the kafka.consumer.auto.offset.reset
                                                                      
configuration defines how offsets are handled.
-pollTimeout                              500                         The 
amount of time(in milliseconds) to wait in the "poll()" call of the conumer.
+pollTimeout                              500                         The 
amount of time(in milliseconds) to wait in the "poll()" call of the consumer.
                                                                      
https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
 defaultPartitionId                       --                          Specifies 
a Kafka partition ID (integer) for all events in this channel to be sent to, 
unless
                                                                      overriden 
by ``partitionIdHeader``. By default, if this property is not set, events will 
be
@@ -2808,9 +3074,9 @@ kafka.consumer.auto.offset.reset         latest           
           What to do
                                                                      latest: 
automatically reset the offset to the latest offset
                                                                      none: 
throw exception to the consumer if no previous offset is found for the 
consumer\'s group
                                                                      anything 
else: throw exception to the consumer.
-kafka.producer.security.protocol         PLAINTEXT                   Set to 
SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using Kerberos. See below 
for additional info on Kerberos setup.
+kafka.producer.security.protocol         PLAINTEXT                   Set to 
SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of 
security. See below for additional info on secure setup.
 kafka.consumer.security.protocol         PLAINTEXT                   Same as 
kafka.producer.security.protocol but for reading/consuming from Kafka.
-*more producer/consumer security props*                              If using 
SASL_SSL or SSL, refer to `Kafka security 
<http://kafka.apache.org/documentation.html#security>`_ for additional
+*more producer/consumer security props*                              If using 
SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security 
<http://kafka.apache.org/documentation.html#security>`_ for additional
                                                                      
properties that need to be set on producer/consumer.
 =======================================  ==========================  
===============================================================================================================
 
@@ -2839,43 +3105,156 @@ Example for agent named a1:
     a1.channels.channel1.kafka.topic = channel1
     a1.channels.channel1.kafka.consumer.group.id = flume-consumer
 
+
+**Security and Kafka Channel:**
+
+Secure authentication as well as data encryption is supported on the 
communication channel between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the 
parameter is named SSL, the actual protocol is a TLS implementation) can be 
used from Kafka version 0.9.0.
+
+As of now data encryption is solely provided by SSL/TLS.
+
+Setting ``kafka.producer|consumer.security.protocol`` to any of the following 
value means:
+
+- **SASL_PLAINTEXT** - Kerberos or plaintext authentication with no data 
encryption
+- **SASL_SSL** - Kerberos or plaintext authentication with data encryption
+- **SSL** - TLS based encryption with optional authentication.
+
+.. warning::
+    There is a performance degradation when SSL is enabled,
+    the magnitude of which depends on the CPU type and the JVM implementation.
+    Reference: `Kafka security overview 
<http://kafka.apache.org/documentation#security_overview>`_
+    and the jira for tracking this issue:
+    `KAFKA-2561 <https://issues.apache.org/jira/browse/KAFKA-2561>`_
+
+
+**TLS and Kafka Channel:**
+
+Please read the steps described in `Configuring Kafka Clients SSL 
<http://kafka.apache.org/documentation#security_configclients>`_
+to learn about additional configuration settings for fine tuning for example 
any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore 
types.
+
+Example configuration with server side authentication and data encryption.
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
+    a1.channels.channel1.kafka.producer.security.protocol = SSL
+    a1.channels.channel1.kafka.producer.ssl.truststore.location = 
/path/to/truststore.jks
+    a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to 
access the truststore>
+    a1.channels.channel1.kafka.consumer.security.protocol = SSL
+    a1.channels.channel1.kafka.consumer.ssl.truststore.location = 
/path/to/truststore.jks
+    a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to 
access the truststore>
+
+
+Note: By default the property ``ssl.endpoint.identification.algorithm``
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm 
= HTTPS
+    a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm 
= HTTPS
+
+Once enabled, clients will verify the server's fully qualified domain name 
(FQDN)
+against one of the following two fields:
+
+#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
+#) Subject Alternative Name (SAN) 
https://tools.ietf.org/html/rfc5280#section-4.2.1.6
+
+If client side authentication is also required then additionally the following 
should be added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by 
Kafka brokers either
+individually or by their signature chain. Common example is to sign each 
client certificate by a single Root CA
+which in turn is trusted by Kafka brokers.
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.keystore.location = 
/path/to/client.keystore.jks
+    a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to 
access the keystore>
+    a1.channels.channel1.kafka.consumer.ssl.keystore.location = 
/path/to/client.keystore.jks
+    a1.channels.channel1.kafka.consumer.ssl.keystore.password = <password to 
access the keystore>
+
+If keystore and key use different password protection then 
``ssl.key.password`` property will
+provide the required additional secret for both consumer and producer 
keystores:
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.key.password = <password to access 
the key>
+    a1.channels.channel1.kafka.consumer.ssl.key.password = <password to access 
the key>
+
+
 **Kerberos and Kafka Channel:**
 
-To use Kafka channel with a Kafka cluster secured with Kerberos, set the 
producer/consumer.security.protocol properties noted above for producer and/or 
consumer.
-The Kerberos keytab and principal to be used is specified in a JAAS file's 
"KafkaClient" section. See `Kafka doc 
<http://kafka.apache.org/documentation.html#security_sasl_clientconfig>`_
-for info on the JAAS file contents. The location of this JAAS file is 
specified via JAVA_OPTS using 
-Djava.security.auth.login.config=/path/to/kafka_jaas.conf (in flume-env.sh)
+To use Kafka channel with a Kafka cluster secured with Kerberos, set the 
``producer/consumer.security.protocol`` properties noted above for producer 
and/or consumer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified 
in a JAAS file's "KafkaClient" section. "Client" section describes the 
Zookeeper connection if needed.
+See `Kafka doc 
<http://kafka.apache.org/documentation.html#security_sasl_clientconfig>`_
+for information on the JAAS file contents. The location of this JAAS file and 
optionally the system wide kerberos configuration can be specified via 
JAVA_OPTS in flume-env.sh:
+
+.. code-block:: properties
 
+    JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
+    JAVA_OPTS="$JAVA_OPTS 
-Djava.security.auth.login.config=/path/to/flume_jaas.conf"
 
-Sample secure configuration using SASL_PLAINTEXT.
+Example secure configuration using SASL_PLAINTEXT:
 
 .. code-block:: properties
 
     a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
-    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9092,kafka-2:9092,kafka-3:9092
+    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9093,kafka-2:9093,kafka-3:9093
     a1.channels.channel1.kafka.topic = channel1
     a1.channels.channel1.kafka.consumer.group.id = flume-consumer
     a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT
+    a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
     a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT
+    a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
 
-Sample JAAS file
+Example secure configuration using SASL_SSL:
 
-.. code-block:: javascript
+.. code-block:: properties
 
-    KafkaClient {
-        com.sun.security.auth.module.Krb5LoginModule required
-        useKeyTab=true
-        storeKey=true
-        serviceName="kafka"
-        keyTab="/path/to/keytabs/testuser1.keytab"
-        principal="testuser1/kafka1.example.com";
-    };
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
+    a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL
+    a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
+    a1.channels.channel1.kafka.producer.ssl.truststore.location = 
/path/to/truststore.jks
+    a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to 
access the truststore>
+    a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
+    a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
+    a1.channels.channel1.kafka.consumer.ssl.truststore.location = 
/path/to/truststore.jks
+    a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to 
access the truststore>
+
+
+Sample JAAS file. For reference of its content please see client config 
sections of the desired authentication mechanism (GSSAPI/PLAIN)
+in Kafka documentation of `SASL configuration 
<http://kafka.apache.org/documentation#security_sasl_clientconfig>`_.
+Since the Kafka Source may also connect to Zookeeper for offset migration, the 
"Client" section was also added to this example.
+This won't be needed unless you require offset migration, or you require this 
section for other secure components.
+Also please make sure that the operating system user of the Flume processes 
has read privileges on the jaas and keytab files.
 
-Sample flume-env.sh
+.. code-block:: javascript
 
-.. code-block:: properties
+    Client {
+      com.sun.security.auth.module.Krb5LoginModule required
+      useKeyTab=true
+      storeKey=true
+      keyTab="/path/to/keytabs/flume.keytab"
+      principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
+    };
 
-    export JAVA_HOME=/path/java-home/
-    export 
JAVA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_jaas.conf"
+    KafkaClient {
+      com.sun.security.auth.module.Krb5LoginModule required
+      useKeyTab=true
+      storeKey=true
+      keyTab="/path/to/keytabs/flume.keytab"
+      principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
+    };
 
 
 File Channel

Reply via email to