Modified: flume/site/trunk/content/sphinx/FlumeUserGuide.rst
URL: 
http://svn.apache.org/viewvc/flume/site/trunk/content/sphinx/FlumeUserGuide.rst?rev=1765259&r1=1765258&r2=1765259&view=diff
==============================================================================
--- flume/site/trunk/content/sphinx/FlumeUserGuide.rst (original)
+++ flume/site/trunk/content/sphinx/FlumeUserGuide.rst Mon Oct 17 12:34:04 2016
@@ -15,7 +15,7 @@
 
 
 ======================================
-Flume 1.6.0 User Guide
+Flume 1.7.0 User Guide
 ======================================
 
 Introduction
@@ -28,29 +28,29 @@ Apache Flume is a distributed, reliable,
 collecting, aggregating and moving large amounts of log data from many
 different sources to a centralized data store.
 
-The use of Apache Flume is not only restricted to log data aggregation. 
+The use of Apache Flume is not only restricted to log data aggregation.
 Since data sources are customizable, Flume can be used to transport massive 
quantities
-of event data including but not limited to network traffic data, 
social-media-generated data, 
+of event data including but not limited to network traffic data, 
social-media-generated data,
 email messages and pretty much any data source possible.
 
 Apache Flume is a top level project at the Apache Software Foundation.
 
 There are currently two release code lines available, versions 0.9.x and 1.x.
 
-Documentation for the 0.9.x track is available at 
+Documentation for the 0.9.x track is available at
 `the Flume 0.9.x User Guide 
<http://archive.cloudera.com/cdh/3/flume/UserGuide/>`_.
 
 This documentation applies to the 1.4.x track.
 
-New and existing users are encouraged to use the 1.x releases so as to 
-leverage the performance improvements and configuration flexibilities 
available 
+New and existing users are encouraged to use the 1.x releases so as to
+leverage the performance improvements and configuration flexibilities available
 in the latest architecture.
 
 
 System Requirements
 -------------------
 
-#. Java Runtime Environment - Java 1.6 or later (Java 1.7 Recommended)
+#. Java Runtime Environment - Java 1.7 or later
 #. Memory - Sufficient memory for configurations used by sources, channels or 
sinks
 #. Disk Space - Sufficient disk space for configurations used by channels or 
sinks
 #. Directory Permissions - Read/Write permissions for directories used by agent
@@ -234,6 +234,36 @@ The original Flume terminal will output
 
 Congratulations - you've successfully configured and deployed a Flume agent! 
Subsequent sections cover agent configuration in much more detail.
 
+Logging raw data
+~~~~~~~~~~~~~~~~
+
+
+Logging the raw stream of data flowing through the ingest pipeline is not 
desired behaviour in
+many production environments because this may result in leaking sensitive data 
or security related
+configurations, such as secret keys, to Flume log files.
+By default, Flume will not log such information. On the other hand, if the 
data pipeline is broken,
+Flume will attempt to provide clues for debugging the problem.
+
+One way to debug problems with event pipelines is to set up an additional 
`Memory Channel`_
+connected to a `Logger Sink`_, which will output all event data to the Flume 
logs.
+In some situations, however, this approach is insufficient.
+
+In order to enable logging of event- and configuration-related data, some Java 
system properties
+must be set in addition to log4j properties.
+
+To enable configuration-related logging, set the Java system property
+``-Dorg.apache.flume.log.printconfig=true``. This can either be passed on the 
command line or by
+setting this in the ``JAVA_OPTS`` variable in *flume-env.sh*.
+
+To enable data logging, set the Java system property 
``-Dorg.apache.flume.log.rawdata=true``
+in the same way described above. For most components, the log4j logging level 
must also be set to
+DEBUG or TRACE to make event-specific logging appear in the Flume logs.
+
+Here is an example of enabling both configuration logging and raw data logging 
while also
+setting the Log4j loglevel to DEBUG for console output::
+
+  $ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 
-Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true 
-Dorg.apache.flume.log.rawdata=true
+
 
 Zookeeper based Configuration
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -900,7 +930,7 @@ Property Name               Default
 **channels**                --
 **type**                    --           The component type name, needs to be 
``jms``
 **initialContextFactory**   --           Inital Context Factory, e.g: 
org.apache.activemq.jndi.ActiveMQInitialContextFactory
-**connectionFactory**       --           The JNDI name the connection factory 
shoulld appear as
+**connectionFactory**       --           The JNDI name the connection factory 
should appear as
 **providerURL**             --           The JMS provider URL
 **destinationName**         --           Destination name
 **destinationType**         --           Destination type (queue or topic)
@@ -976,48 +1006,60 @@ Despite the reliability guarantees of th
 cases in which events may be duplicated if certain downstream failures occur.
 This is consistent with the guarantees offered by other Flume components.
 
-====================  ==============  
==========================================================
-Property Name         Default         Description
-====================  ==============  
==========================================================
-**channels**          --
-**type**              --              The component type name, needs to be 
``spooldir``.
-**spoolDir**          --              The directory from which to read files 
from.
-fileSuffix            .COMPLETED      Suffix to append to completely ingested 
files
-deletePolicy          never           When to delete completed files: 
``never`` or ``immediate``
-fileHeader            false           Whether to add a header storing the 
absolute path filename.
-fileHeaderKey         file            Header key to use when appending 
absolute path filename to event header.
-basenameHeader        false           Whether to add a header storing the 
basename of the file.
-basenameHeaderKey     basename        Header Key to use when appending  
basename of file to event header.
-ignorePattern         ^$              Regular expression specifying which 
files to ignore (skip)
-trackerDir            .flumespool     Directory to store metadata related to 
processing of files.
-                                      If this path is not an absolute path, 
then it is interpreted as relative to the spoolDir.
-consumeOrder          oldest          In which order files in the spooling 
directory will be consumed ``oldest``,
-                                      ``youngest`` and ``random``. In case of 
``oldest`` and ``youngest``, the last modified
-                                      time of the files will be used to 
compare the files. In case of a tie, the file
-                                      with smallest laxicographical order will 
be consumed first. In case of ``random`` any
-                                      file will be picked randomly. When using 
``oldest`` and ``youngest`` the whole
-                                      directory will be scanned to pick the 
oldest/youngest file, which might be slow if there
-                                      are a large number of files, while using 
``random`` may cause old files to be consumed
-                                      very late if new files keep coming in 
the spooling directory.
-maxBackoff            4000            The maximum time (in millis) to wait 
between consecutive attempts to write to the channel(s) if the channel is full. 
The source will start at a low backoff and increase it exponentially each time 
the channel throws a ChannelException, upto the value specified by this 
parameter.
-batchSize             100             Granularity at which to batch transfer 
to the channel
-inputCharset          UTF-8           Character set used by deserializers that 
treat the input file as text.
-decodeErrorPolicy     ``FAIL``        What to do when we see a non-decodable 
character in the input file.
-                                      ``FAIL``: Throw an exception and fail to 
parse the file.
-                                      ``REPLACE``: Replace the unparseable 
character with the "replacement character" char,
-                                      typically Unicode U+FFFD.
-                                      ``IGNORE``: Drop the unparseable 
character sequence.
-deserializer          ``LINE``        Specify the deserializer used to parse 
the file into events.
-                                      Defaults to parsing each line as an 
event. The class specified must implement
-                                      ``EventDeserializer.Builder``.
-deserializer.*                        Varies per event deserializer.
-bufferMaxLines        --              (Obselete) This option is now ignored.
-bufferMaxLineLength   5000            (Deprecated) Maximum length of a line in 
the commit buffer. Use deserializer.maxLineLength instead.
-selector.type         replicating     replicating or multiplexing
-selector.*                            Depends on the selector.type value
-interceptors          --              Space-separated list of interceptors
+========================  ==============  
==========================================================
+Property Name             Default         Description
+========================  ==============  
==========================================================
+**channels**              --
+**type**                  --              The component type name, needs to be 
``spooldir``.
+**spoolDir**              --              The directory from which to read 
files from.
+fileSuffix                .COMPLETED      Suffix to append to completely 
ingested files
+deletePolicy              never           When to delete completed files: 
``never`` or ``immediate``
+fileHeader                false           Whether to add a header storing the 
absolute path filename.
+fileHeaderKey             file            Header key to use when appending 
absolute path filename to event header.
+basenameHeader            false           Whether to add a header storing the 
basename of the file.
+basenameHeaderKey         basename        Header Key to use when appending  
basename of file to event header.
+includePattern            ^.*$            Regular expression specifying which 
files to include.
+                                          It can used together with 
``ignorePattern``.
+                                          If a file matches both 
``ignorePattern`` and ``includePattern`` regex,
+                                          the file is ignored.
+ignorePattern             ^$              Regular expression specifying which 
files to ignore (skip).
+                                          It can used together with 
``includePattern``.
+                                          If a file matches both 
``ignorePattern`` and ``includePattern`` regex,
+                                          the file is ignored.
+trackerDir                .flumespool     Directory to store metadata related 
to processing of files.
+                                          If this path is not an absolute 
path, then it is interpreted as relative to the spoolDir.
+consumeOrder              oldest          In which order files in the spooling 
directory will be consumed ``oldest``,
+                                          ``youngest`` and ``random``. In case 
of ``oldest`` and ``youngest``, the last modified
+                                          time of the files will be used to 
compare the files. In case of a tie, the file
+                                          with smallest lexicographical order 
will be consumed first. In case of ``random`` any
+                                          file will be picked randomly. When 
using ``oldest`` and ``youngest`` the whole
+                                          directory will be scanned to pick 
the oldest/youngest file, which might be slow if there
+                                          are a large number of files, while 
using ``random`` may cause old files to be consumed
+                                          very late if new files keep coming 
in the spooling directory.
+pollDelay                 500             Delay (in milliseconds) used when 
polling for new files.
+recursiveDirectorySearch  false           Whether to monitor sub directories 
for new files to read.
+maxBackoff                4000            The maximum time (in millis) to wait 
between consecutive attempts to
+                                          write to the channel(s) if the 
channel is full. The source will start at
+                                          a low backoff and increase it 
exponentially each time the channel throws a
+                                          ChannelException, upto the value 
specified by this parameter.
+batchSize                 100             Granularity at which to batch 
transfer to the channel
+inputCharset              UTF-8           Character set used by deserializers 
that treat the input file as text.
+decodeErrorPolicy         ``FAIL``        What to do when we see a 
non-decodable character in the input file.
+                                          ``FAIL``: Throw an exception and 
fail to parse the file.
+                                          ``REPLACE``: Replace the unparseable 
character with the "replacement character" char,
+                                          typically Unicode U+FFFD.
+                                          ``IGNORE``: Drop the unparseable 
character sequence.
+deserializer              ``LINE``        Specify the deserializer used to 
parse the file into events.
+                                          Defaults to parsing each line as an 
event. The class specified must implement
+                                          ``EventDeserializer.Builder``.
+deserializer.*                            Varies per event deserializer.
+bufferMaxLines            --              (Obselete) This option is now 
ignored.
+bufferMaxLineLength       5000            (Deprecated) Maximum length of a 
line in the commit buffer. Use deserializer.maxLineLength instead.
+selector.type             replicating     replicating or multiplexing
+selector.*                                Depends on the selector.type value
+interceptors              --              Space-separated list of interceptors
 interceptors.*
-====================  ==============  
==========================================================
+========================  ==============  
==========================================================
 
 Example for an agent named agent-1:
 
@@ -1090,16 +1132,76 @@ Property Name               Default
 deserializer.maxBlobLength  100000000           The maximum number of bytes to 
read and buffer for a given request
 ==========================  ==================  
=======================================================================
 
+Taildir Source
+~~~~~~~~~~~~~~~~~~~~~~~~~
+.. note:: **This source is provided as a preview feature. It does not work on 
Windows.**
+
+Watch the specified files, and tail them in nearly real-time once detected new 
lines appended to the each files.
+If the new lines are being written, this source will retry reading them in 
wait for the completion of the write.
+
+This source is reliable and will not miss data even when the tailing files 
rotate.
+It periodically writes the last read position of each files on the given 
position file in JSON format.
+If Flume is stopped or down for some reason, it can restart tailing from the 
position written on the existing position file.
+
+In other use case, this source can also start tailing from the arbitrary 
position for each files using the given position file.
+When there is no position file on the specified path, it will start tailing 
from the first line of each files by default.
+
+Files will be consumed in order of their modification time. File with the 
oldest modification time will be consumed first.
+
+This source does not rename or delete or do any modifications to the file 
being tailed.
+Currently this source does not support tailing binary files. It reads text 
files line by line.
+
+=================================== ============================== 
===================================================
+Property Name                       Default                        Description
+=================================== ============================== 
===================================================
+**channels**                        --
+**type**                            --                             The 
component type name, needs to be ``TAILDIR``.
+**filegroups**                      --                             
Space-separated list of file groups. Each file group indicates a set of files 
to be tailed.
+**filegroups.<filegroupName>**      --                             Absolute 
path of the file group. Regular expression (and not file system patterns) can 
be used for filename only.
+positionFile                        ~/.flume/taildir_position.json File in 
JSON format to record the inode, the absolute path and the last position of 
each tailing file.
+headers.<filegroupName>.<headerKey> --                             Header 
value which is the set with header key. Multiple headers can be specified for 
one file group.
+byteOffsetHeader                    false                          Whether to 
add the byte offset of a tailed line to a header called 'byteoffset'.
+skipToEnd                           false                          Whether to 
skip the position to EOF in the case of files not written on the position file.
+idleTimeout                         120000                         Time (ms) 
to close inactive files. If the closed file is appended new lines to, this 
source will automatically re-open it.
+writePosInterval                    3000                           Interval 
time (ms) to write the last position of each file on the position file.
+batchSize                           100                            Max number 
of lines to read and send to the channel at a time. Using the default is 
usually fine.
+backoffSleepIncrement               1000                           The 
increment for time delay before reattempting to poll for new data, when the 
last attempt did not find any new data.
+maxBackoffSleep                     5000                           The max 
time delay between each reattempt to poll for new data, when the last attempt 
did not find any new data.
+cachePatternMatching                true                           Listing 
directories and applying the filename regex pattern may be time consuming for 
directories
+                                                                   containing 
thousands of files. Caching the list of matching files can improve performance.
+                                                                   The order 
in which files are consumed will also be cached.
+                                                                   Requires 
that the file system keeps track of modification times with at least a 1-second 
granularity.
+fileHeader                          false                          Whether to 
add a header storing the absolute path filename.
+fileHeaderKey                       file                           Header key 
to use when appending absolute path filename to event header.
+=================================== ============================== 
===================================================
+
+Example for agent named a1:
+
+.. code-block:: properties
+
+  a1.sources = r1
+  a1.channels = c1
+  a1.sources.r1.type = TAILDIR
+  a1.sources.r1.channels = c1
+  a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
+  a1.sources.r1.filegroups = f1 f2
+  a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
+  a1.sources.r1.headers.f1.headerKey1 = value1
+  a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
+  a1.sources.r1.headers.f2.headerKey1 = value2
+  a1.sources.r1.headers.f2.headerKey2 = value2-2
+  a1.sources.r1.fileHeader = true
+
 Twitter 1% firehose Source (experimental)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 .. warning::
-  This source is hightly experimental and may change between minor versions of 
Flume.
+  This source is highly experimental and may change between minor versions of 
Flume.
   Use at your own risk.
 
 Experimental source that connects via Streaming API to the 1% sample twitter
 firehose, continously downloads tweets, converts them to Avro format and
-sends Avro events to a downstream Flume sink. Requires the consumer and 
+sends Avro events to a downstream Flume sink. Requires the consumer and
 access tokens and secrets of a Twitter developer account.
 Required properties are in **bold**.
 
@@ -1111,7 +1213,7 @@ Property Name          Default      Desc
 **consumerKey**        --           OAuth consumer key
 **consumerSecret**     --           OAuth consumer secret
 **accessToken**        --           OAuth access token
-**accessTokenSecret**  --           OAuth toekn secret 
+**accessTokenSecret**  --           OAuth token secret
 maxBatchSize           1000         Maximum number of twitter messages to put 
in a single batch
 maxBatchDurationMillis 1000         Maximum number of milliseconds to wait 
before closing a batch
 ====================== ===========  
===================================================
@@ -1134,57 +1236,224 @@ Example for agent named a1:
 Kafka Source
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-Kafka Source is an Apache Kafka consumer that reads messages from a Kafka 
topic.
+Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics.
 If you have multiple Kafka sources running, you can configure them with the 
same Consumer Group
-so each will read a unique set of partitions for the topic.
+so each will read a unique set of partitions for the topics.
 
+==================================  ===========  
===================================================
+Property Name                       Default      Description
+==================================  ===========  
===================================================
+**channels**                        --
+**type**                            --           The component type name, 
needs to be ``org.apache.flume.source.kafka.KafkaSource``
+**kafka.bootstrap.servers**         --           List of brokers in the Kafka 
cluster used by the source
+kafka.consumer.group.id             flume        Unique identified of consumer 
group. Setting the same id in multiple sources or agents
+                                                 indicates that they are part 
of the same consumer group
+**kafka.topics**                    --           Comma-separated list of 
topics the kafka consumer will read messages from.
+**kafka.topics.regex**              --           Regex that defines set of 
topics the source is subscribed on. This property has higher priority
+                                                 than ``kafka.topics`` and 
overrides ``kafka.topics`` if exists.
+batchSize                           1000         Maximum number of messages 
written to Channel in one batch
+batchDurationMillis                 1000         Maximum time (in ms) before a 
batch will be written to Channel
+                                                 The batch will be written 
whenever the first of size and time will be reached.
+backoffSleepIncrement               1000         Initial and incremental wait 
time that is triggered when a Kafka Topic appears to be empty.
+                                                 Wait period will reduce 
aggressive pinging of an empty Kafka Topic.  One second is ideal for
+                                                 ingestion use cases but a 
lower value may be required for low latency operations with
+                                                 interceptors.
+maxBackoffSleep                     5000         Maximum wait time that is 
triggered when a Kafka Topic appears to be empty.  Five seconds is
+                                                 ideal for ingestion use cases 
but a lower value may be required for low latency operations
+                                                 with interceptors.
+useFlumeEventFormat                 false        By default events are taken 
as bytes from the Kafka topic directly into the event body. Set to
+                                                 true to read events as the 
Flume Avro binary format. Used in conjunction with the same property
+                                                 on the KafkaSink or with the 
parseAsFlumeEvent property on the Kafka Channel this will preserve
+                                                 any Flume headers sent on the 
producing side.
+migrateZookeeperOffsets             true         When no Kafka stored offset 
is found, look up the offsets in Zookeeper and commit them to Kafka.
+                                                 This should be true to 
support seamless Kafka client migration from older versions of Flume.
+                                                 Once migrated this can be set 
to false, though that should generally not be required.
+                                                 If no Zookeeper offset is 
found, the Kafka configuration kafka.consumer.auto.offset.reset
+                                                 defines how offsets are 
handled.
+                                                 Check `Kafka documentation 
<http://kafka.apache.org/documentation.html#newconsumerconfigs>`_ for details
+kafka.consumer.security.protocol    PLAINTEXT    Set to SASL_PLAINTEXT, 
SASL_SSL or SSL if writing to Kafka using some level of security. See below for 
additional info on secure setup.
+*more consumer security props*                   If using SASL_PLAINTEXT, 
SASL_SSL or SSL refer to `Kafka security 
<http://kafka.apache.org/documentation.html#security>`_ for additional
+                                                 properties that need to be 
set on consumer.
+Other Kafka Consumer Properties     --           These properties are used to 
configure the Kafka Consumer. Any consumer property supported
+                                                 by Kafka can be used. The 
only requirement is to prepend the property name with the prefix
+                                                 ``kafka.consumer``.
+                                                 For example: 
``kafka.consumer.auto.offset.reset``
+==================================  ===========  
===================================================
 
+.. note:: The Kafka Source overrides two Kafka consumer parameters:
+          auto.commit.enable is set to "false" by the source and every batch 
is committed. Kafka source guarantees at least once
+          strategy of messages retrieval. The duplicates can be present when 
the source starts.
+          The Kafka Source also provides defaults for the 
key.deserializer(org.apache.kafka.common.serialization.StringSerializer)
+          and 
value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer). 
Modification of these parameters is not recommended.
 
-===============================  ===========  
===================================================
-Property Name                    Default      Description
-===============================  ===========  
===================================================
-**channels**                     --
-**type**                         --           The component type name, needs 
to be ``org.apache.flume.source.kafka,KafkaSource``
-**zookeeperConnect**             --           URI of ZooKeeper used by Kafka 
cluster
-**groupId**                      flume        Unique identified of consumer 
group. Setting the same id in multiple sources or agents
-                                              indicates that they are part of 
the same consumer group
-**topic**                        --           Kafka topic we'll read messages 
from. At the time, this is a single topic only.
-batchSize                        1000         Maximum number of messages 
written to Channel in one batch
-batchDurationMillis              1000         Maximum time (in ms) before a 
batch will be written to Channel
-                                              The batch will be written 
whenever the first of size and time will be reached.
-backoffSleepIncrement            1000         Initial and incremental wait 
time that is triggered when a Kafka Topic appears to be empty.
-                                              Wait period will reduce 
aggressive pinging of an empty Kafka Topic.  One second is ideal for
-                                              ingestion use cases but a lower 
value may be required for low latency operations with
-                                              interceptors.
-maxBackoffSleep                  5000         Maximum wait time that is 
triggered when a Kafka Topic appears to be empty.  Five seconds is
-                                              ideal for ingestion use cases 
but a lower value may be required for low latency operations
-                                              with interceptors.
-Other Kafka Consumer Properties  --           These properties are used to 
configure the Kafka Consumer. Any producer property supported
-                                              by Kafka can be used. The only 
requirement is to prepend the property name with the prefix ``kafka.``.
-                                              For example: 
kafka.consumer.timeout.ms
-                                              Check `Kafka documentation 
<https://kafka.apache.org/08/configuration.html#consumerconfigs>` for details
-===============================  ===========  
===================================================
+Deprecated Properties
 
-.. note:: The Kafka Source overrides two Kafka consumer parameters:
-          auto.commit.enable is set to "false" by the source and we commit 
every batch. For improved performance
-          this can be set to "true", however, this can lead to loss of data
-          consumer.timeout.ms is set to 10ms, so when we check Kafka for new 
data we wait at most 10ms for the data to arrive
-          setting this to a higher value can reduce CPU utilization (we'll 
poll Kafka in less of a tight loop), but also means
-          higher latency in writing batches to channel (since we'll wait 
longer for data to arrive).
+===============================  ===================  
=============================================================================================
+Property Name                    Default              Description
+===============================  ===================  
=============================================================================================
+topic                            --                   Use kafka.topics
+groupId                          flume                Use 
kafka.consumer.group.id
+zookeeperConnect                 --                   Is no longer supported 
by kafka consumer client since 0.9.x. Use kafka.bootstrap.servers
+                                                      to establish connection 
with kafka cluster
+===============================  ===================  
=============================================================================================
 
+Example for topic subscription by comma-separated topic list.
 
-Example for agent named tier1:
+.. code-block:: properties
+
+    tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
+    tier1.sources.source1.channels = channel1
+    tier1.sources.source1.batchSize = 5000
+    tier1.sources.source1.batchDurationMillis = 2000
+    tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
+    tier1.sources.source1.kafka.topics = test1, test2
+    tier1.sources.source1.kafka.consumer.group.id = custom.g.id
+
+Example for topic subscription by regex
 
 .. code-block:: properties
 
     tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
     tier1.sources.source1.channels = channel1
-    tier1.sources.source1.zookeeperConnect = localhost:2181
-    tier1.sources.source1.topic = test1
-    tier1.sources.source1.groupId = flume
-    tier1.sources.source1.kafka.consumer.timeout.ms = 100
+    tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
+    tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
+    # the default kafka.consumer.group.id=flume is used
+
+
+**Security and Kafka Source:**
+
+Secure authentication as well as data encryption is supported on the 
communication channel between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the 
parameter is named SSL, the actual protocol is a TLS implementation) can be 
used from Kafka version 0.9.0.
+
+As of now data encryption is solely provided by SSL/TLS.
+
+Setting ``kafka.consumer.security.protocol`` to any of the following value 
means:
+
+- **SASL_PLAINTEXT** - Kerberos or plaintext authentication with no data 
encryption
+- **SASL_SSL** - Kerberos or plaintext authentication with data encryption
+- **SSL** - TLS based encryption with optional authentication.
+
+.. warning::
+    There is a performance degradation when SSL is enabled,
+    the magnitude of which depends on the CPU type and the JVM implementation.
+    Reference: `Kafka security overview 
<http://kafka.apache.org/documentation#security_overview>`_
+    and the jira for tracking this issue:
+    `KAFKA-2561 <https://issues.apache.org/jira/browse/KAFKA-2561>`_
+
+
+**TLS and Kafka Source:**
+
+Please read the steps described in `Configuring Kafka Clients SSL 
<http://kafka.apache.org/documentation#security_configclients>`_
+to learn about additional configuration settings for fine tuning for example 
any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore 
types.
+
+Example configuration with server side authentication and data encryption.
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
+    a1.channels.channel1.kafka.consumer.security.protocol = SSL
+    
a1.channels.channel1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
+    a1.channels.channel1.kafka.consumer.ssl.truststore.password=<password to 
access the truststore>
+
+
+Note: By default the property ``ssl.endpoint.identification.algorithm``
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties
+
+.. code-block:: properties
+
+    
a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS
+
+Once enabled, clients will verify the server's fully qualified domain name 
(FQDN)
+against one of the following two fields:
+
+#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
+#) Subject Alternative Name (SAN) 
https://tools.ietf.org/html/rfc5280#section-4.2.1.6
+
+If client side authentication is also required then additionally the following 
should be added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by 
Kafka brokers either
+individually or by their signature chain. Common example is to sign each 
client certificate by a single Root CA
+which in turn is trusted by Kafka brokers.
+
+.. code-block:: properties
+
+    
a1.channels.channel1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
+    a1.channels.channel1.kafka.consumer.ssl.keystore.password=<password to 
access the keystore>
+
+If keystore and key use different password protection then 
``ssl.key.password`` property will
+provide the required additional secret for both consumer keystores:
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.consumer.ssl.key.password=<password to access 
the key>
+
+
+**Kerberos and Kafka Source:**
+
+To use Kafka source with a Kafka cluster secured with Kerberos, set the 
``consumer.security.protocol`` properties noted above for consumer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified 
in a JAAS file's "KafkaClient" section. "Client" section describes the 
Zookeeper connection if needed.
+See `Kafka doc 
<http://kafka.apache.org/documentation.html#security_sasl_clientconfig>`_
+for information on the JAAS file contents. The location of this JAAS file and 
optionally the system wide kerberos configuration can be specified via 
JAVA_OPTS in flume-env.sh:
+
+.. code-block:: properties
+
+    JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
+    JAVA_OPTS="$JAVA_OPTS 
-Djava.security.auth.login.config=/path/to/flume_jaas.conf"
 
+Example secure configuration using SASL_PLAINTEXT:
 
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
+    a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT
+    a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
+
+Example secure configuration using SASL_SSL:
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
+    a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
+    a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
+    
a1.channels.channel1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
+    a1.channels.channel1.kafka.consumer.ssl.truststore.password=<password to 
access the truststore>
+
+
+Sample JAAS file. For reference of its content please see client config 
sections of the desired authentication mechanism (GSSAPI/PLAIN)
+in Kafka documentation of `SASL configuration 
<http://kafka.apache.org/documentation#security_sasl_clientconfig>`_.
+Since the Kafka Source may also connect to Zookeeper for offset migration, the 
"Client" section was also added to this example.
+This won't be needed unless you require offset migration, or you require this 
section for other secure components.
+Also please make sure that the operating system user of the Flume processes 
has read privileges on the jaas and keytab files.
+
+.. code-block:: javascript
+
+    Client {
+      com.sun.security.auth.module.Krb5LoginModule required
+      useKeyTab=true
+      storeKey=true
+      keyTab="/path/to/keytabs/flume.keytab"
+      principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
+    };
+
+    KafkaClient {
+      com.sun.security.auth.module.Krb5LoginModule required
+      useKeyTab=true
+      storeKey=true
+      keyTab="/path/to/keytabs/flume.keytab"
+      principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
+    };
 
 
 NetCat Source
@@ -1221,27 +1490,28 @@ Example for agent named a1:
   a1.channels = c1
   a1.sources.r1.type = netcat
   a1.sources.r1.bind = 0.0.0.0
-  a1.sources.r1.bind = 6666
+  a1.sources.r1.port = 6666
   a1.sources.r1.channels = c1
 
 Sequence Generator Source
 ~~~~~~~~~~~~~~~~~~~~~~~~~
 
-A simple sequence generator that continuously generates events with a counter
-that starts from 0 and increments by 1. Useful mainly for testing.
-Required properties are in **bold**.
-
-==============  ===========  ========================================
-Property Name   Default      Description
-==============  ===========  ========================================
+A simple sequence generator that continuously generates events with a counter 
that starts from 0,
+increments by 1 and stops at totalEvents. Retries when it can't send events to 
the channel. Useful
+mainly for testing. Required properties are in **bold**.
+
+==============  ===============  ========================================
+Property Name   Default          Description
+==============  ===============  ========================================
 **channels**    --
-**type**        --           The component type name, needs to be ``seq``
-selector.type                replicating or multiplexing
-selector.*      replicating  Depends on the selector.type value
-interceptors    --           Space-separated list of interceptors
+**type**        --               The component type name, needs to be ``seq``
+selector.type                    replicating or multiplexing
+selector.*      replicating      Depends on the selector.type value
+interceptors    --               Space-separated list of interceptors
 interceptors.*
 batchSize       1
-==============  ===========  ========================================
+totalEvents     Long.MAX_VALUE   Number of unique events sent by the source.
+==============  ===============  ========================================
 
 Example for agent named a1:
 
@@ -1660,34 +1930,39 @@ required.
 
 The following are the escape sequences supported:
 
-=========  =================================================
-Alias      Description
-=========  =================================================
-%{host}    Substitute value of event header named "host". Arbitrary header 
names are supported.
-%t         Unix time in milliseconds
-%a         locale's short weekday name (Mon, Tue, ...)
-%A         locale's full weekday name (Monday, Tuesday, ...)
-%b         locale's short month name (Jan, Feb, ...)
-%B         locale's long month name (January, February, ...)
-%c         locale's date and time (Thu Mar 3 23:05:25 2005)
-%d         day of month (01)
-%e         day of month without padding (1)
-%D         date; same as %m/%d/%y
-%H         hour (00..23)
-%I         hour (01..12)
-%j         day of year (001..366)
-%k         hour ( 0..23)
-%m         month (01..12)
-%n         month without padding (1..12)
-%M         minute (00..59)
-%p         locale's equivalent of am or pm
-%s         seconds since 1970-01-01 00:00:00 UTC
-%S         second (00..59)
-%y         last two digits of year (00..99)
-%Y         year (2010)
-%z         +hhmm numeric timezone (for example, -0400)
-=========  =================================================
+===============  =================================================
+Alias            Description
+===============  =================================================
+%{host}          Substitute value of event header named "host". Arbitrary 
header names are supported.
+%t               Unix time in milliseconds
+%a               locale's short weekday name (Mon, Tue, ...)
+%A               locale's full weekday name (Monday, Tuesday, ...)
+%b               locale's short month name (Jan, Feb, ...)
+%B               locale's long month name (January, February, ...)
+%c               locale's date and time (Thu Mar 3 23:05:25 2005)
+%d               day of month (01)
+%e               day of month without padding (1)
+%D               date; same as %m/%d/%y
+%H               hour (00..23)
+%I               hour (01..12)
+%j               day of year (001..366)
+%k               hour ( 0..23)
+%m               month (01..12)
+%n               month without padding (1..12)
+%M               minute (00..59)
+%p               locale's equivalent of am or pm
+%s               seconds since 1970-01-01 00:00:00 UTC
+%S               second (00..59)
+%y               last two digits of year (00..99)
+%Y               year (2010)
+%z               +hhmm numeric timezone (for example, -0400)
+%[localhost]     Substitute the hostname of the host where the agent is running
+%[IP]            Substitute the IP address of the host where the agent is 
running
+%[FQDN]          Substitute the canonical hostname of the host where the agent 
is running
+===============  =================================================
 
+Note: The escape strings %[localhost], %[IP] and %[FQDN] all rely on Java's 
ability to obtain the
+hostname, which may fail in some networking environments.
 
 The file in use will have the name mangled to include ".tmp" at the end. Once
 the file is closed, this extension is removed. This allows excluding partially
@@ -1773,8 +2048,7 @@ This sink streams events containing deli
 Events are written using Hive transactions. As soon as a set of events are 
committed to Hive, they become
 immediately visible to Hive queries. Partitions to which flume will stream to 
can either be pre-created
 or, optionally, Flume can create them if they are missing. Fields from 
incoming event data are mapped to
-corresponding columns in the Hive table. **This sink is provided as a preview 
feature and not recommended
-for use in production.**
+corresponding columns in the Hive table.
 
 ======================    ============  
======================================================================
 Name                      Default       Description
@@ -1912,8 +2186,9 @@ accept tab separated input containing th
 Logger Sink
 ~~~~~~~~~~~
 
-Logs event at INFO level. Typically useful for testing/debugging purpose.
-Required properties are in **bold**.
+Logs event at INFO level. Typically useful for testing/debugging purpose. 
Required properties are
+in **bold**. This sink is the only exception which doesn't require the extra 
configuration
+explained in the `Logging raw data`_ section.
 
 ==============  =======  ===========================================
 Property Name   Default  Description
@@ -2065,16 +2340,19 @@ File Roll Sink
 Stores events on the local filesystem.
 Required properties are in **bold**.
 
-===================  =======  
======================================================================================================================
-Property Name        Default  Description
-===================  =======  
======================================================================================================================
-**channel**          --
-**type**             --       The component type name, needs to be 
``file_roll``.
-**sink.directory**   --       The directory where files will be stored
-sink.rollInterval    30       Roll the file every 30 seconds. Specifying 0 
will disable rolling and cause all events to be written to a single file.
-sink.serializer      TEXT     Other possible options include ``avro_event`` or 
the FQCN of an implementation of EventSerializer.Builder interface.
-batchSize            100
-===================  =======  
======================================================================================================================
+==========================  =======  
======================================================================================================================
+Property Name               Default  Description
+==========================  =======  
======================================================================================================================
+**channel**                 --
+**type**                    --       The component type name, needs to be 
``file_roll``.
+**sink.directory**          --       The directory where files will be stored
+sink.pathManager            DEFAULT  The PathManager implementation to use.
+sink.pathManager.extension  --       The file extension if the default 
PathManager is used.
+sink.pathManager.prefix     --       A character string to add to the 
beginning of the file name if the default PathManager is used
+sink.rollInterval           30       Roll the file every 30 seconds. 
Specifying 0 will disable rolling and cause all events to be written to a 
single file.
+sink.serializer             TEXT     Other possible options include 
``avro_event`` or the FQCN of an implementation of EventSerializer.Builder 
interface.
+batchSize                   100
+==========================  =======  
======================================================================================================================
 
 Example for agent named a1:
 
@@ -2230,19 +2508,19 @@ This sink extracts data from Flume event
 
 This sink is well suited for use cases that stream raw data into HDFS (via the 
HdfsSink) and simultaneously extract, transform and load the same data into 
Solr (via MorphlineSolrSink). In particular, this sink can process arbitrary 
heterogeneous raw data from disparate data sources and turn it into a data 
model that is useful to Search applications.
 
-The ETL functionality is customizable using a `morphline configuration file 
<http://cloudera.github.io/cdk/docs/current/cdk-morphlines/index.html>`_ that 
defines a chain of transformation commands that pipe event records from one 
command to another. 
+The ETL functionality is customizable using a `morphline configuration file 
<http://cloudera.github.io/cdk/docs/current/cdk-morphlines/index.html>`_ that 
defines a chain of transformation commands that pipe event records from one 
command to another.
 
 Morphlines can be seen as an evolution of Unix pipelines where the data model 
is generalized to work with streams of generic records, including arbitrary 
binary payloads. A morphline command is a bit like a Flume Interceptor. 
Morphlines can be embedded into Hadoop components such as Flume.
 
 Commands to parse and transform a set of standard data formats such as log 
files, Avro, CSV, Text, HTML, XML, PDF, Word, Excel, etc. are provided out of 
the box, and additional custom commands and parsers for additional data formats 
can be added as morphline plugins. Any kind of data format can be indexed and 
any Solr documents for any kind of Solr schema can be generated, and any custom 
ETL logic can be registered and executed.
 
-Morphlines manipulate continuous streams of records. The data model can be 
described as follows: A record is a set of named fields where each field has an 
ordered list of one or more values. A value can be any Java Object. That is, a 
record is essentially a hash table where each hash table entry contains a 
String key and a list of Java Objects as values. (The implementation uses 
Guava's ``ArrayListMultimap``, which is a ``ListMultimap``). Note that a field 
can have multiple values and any two records need not use common field names. 
+Morphlines manipulate continuous streams of records. The data model can be 
described as follows: A record is a set of named fields where each field has an 
ordered list of one or more values. A value can be any Java Object. That is, a 
record is essentially a hash table where each hash table entry contains a 
String key and a list of Java Objects as values. (The implementation uses 
Guava's ``ArrayListMultimap``, which is a ``ListMultimap``). Note that a field 
can have multiple values and any two records need not use common field names.
 
 This sink fills the body of the Flume event into the ``_attachment_body`` 
field of the morphline record, as well as copies the headers of the Flume event 
into record fields of the same name. The commands can then act on this data.
 
 Routing to a SolrCloud cluster is supported to improve scalability. Indexing 
load can be spread across a large number of MorphlineSolrSinks for improved 
scalability. Indexing load can be replicated across multiple MorphlineSolrSinks 
for high availability, for example using Flume features such as Load balancing 
Sink Processor. MorphlineInterceptor can also help to implement dynamic routing 
to multiple Solr collections (e.g. for multi-tenancy).
 
-The morphline and solr jars required for your environment must be placed in 
the lib directory of the Apache Flume installation. 
+The morphline and solr jars required for your environment must be placed in 
the lib directory of the Apache Flume installation.
 
 The type is the FQCN: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
 
@@ -2280,11 +2558,11 @@ ElasticSearchSink
 ~~~~~~~~~~~~~~~~~
 
 This sink writes data to an elasticsearch cluster. By default, events will be 
written so that the `Kibana <http://kibana.org>`_ graphical interface
-can display them - just as if `logstash <https://logstash.net>`_ wrote them. 
+can display them - just as if `logstash <https://logstash.net>`_ wrote them.
 
-The elasticsearch and lucene-core jars required for your environment must be 
placed in the lib directory of the Apache Flume installation. 
+The elasticsearch and lucene-core jars required for your environment must be 
placed in the lib directory of the Apache Flume installation.
 Elasticsearch requires that the major version of the client JAR match that of 
the server and that both are running the same minor version
-of the JVM. SerializationExceptions will appear if this is incorrect. To 
+of the JVM. SerializationExceptions will appear if this is incorrect. To
 select the required version first determine the version of elasticsearch and 
the JVM version the target cluster is running. Then select an elasticsearch 
client
 library which matches the major version. A 0.19.x client can talk to a 0.19.x 
cluster; 0.20.x can talk to 0.20.x and 0.90.x can talk to 0.90.x. Once the
 elasticsearch version has been determined then read the pom.xml file to 
determine the correct lucene-core JAR version to use. The Flume agent
@@ -2416,52 +2694,212 @@ Kafka Sink
 This is a Flume Sink implementation that can publish data to a
 `Kafka <http://kafka.apache.org/>`_ topic. One of the objective is to 
integrate Flume
 with Kafka so that pull based processing systems can process the data coming
-through various Flume sources. This currently supports Kafka 0.8.x series of 
releases.
+through various Flume sources. This currently supports Kafka 0.9.x series of 
releases.
+
+This version of Flume no longer supports Older Versions (0.8.x) of Kafka.
 
 Required properties are marked in bold font.
 
 
-===============================  ===================  
=============================================================================================
-Property Name                    Default              Description
-===============================  ===================  
=============================================================================================
-**type**                         --                   Must be set to 
``org.apache.flume.sink.kafka.KafkaSink``
-**brokerList**                   --                   List of brokers 
Kafka-Sink will connect to, to get the list of topic partitions
-                                                      This can be a partial 
list of brokers, but we recommend at least two for HA.
-                                                      The format is comma 
separated list of hostname:port
-topic                            default-flume-topic  The topic in Kafka to 
which the messages will be published. If this parameter is configured,
-                                                      messages will be 
published to this topic.
-                                                      If the event header 
contains a "topic" field, the event will be published to that topic
-                                                      overriding the topic 
configured here.
-batchSize                        100                  How many messages to 
process in one batch. Larger batches improve throughput while adding latency.
-requiredAcks                     1                    How many replicas must 
acknowledge a message before its considered successfully written.
-                                                      Accepted values are 0 
(Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all 
replicas)
-                                                      Set this to -1 to avoid 
data loss in some cases of leader failure.
-Other Kafka Producer Properties  --                   These properties are 
used to configure the Kafka Producer. Any producer property supported
-                                                      by Kafka can be used. 
The only requirement is to prepend the property name with the prefix ``kafka.``.
-                                                      For example: 
kafka.producer.type
-===============================  ===================  
=============================================================================================
+==================================  ===================  
=============================================================================================
+Property Name                       Default              Description
+==================================  ===================  
=============================================================================================
+**type**                            --                   Must be set to 
``org.apache.flume.sink.kafka.KafkaSink``
+**kafka.bootstrap.servers**         --                   List of brokers 
Kafka-Sink will connect to, to get the list of topic partitions
+                                                         This can be a partial 
list of brokers, but we recommend at least two for HA.
+                                                         The format is comma 
separated list of hostname:port
+kafka.topic                         default-flume-topic  The topic in Kafka to 
which the messages will be published. If this parameter is configured,
+                                                         messages will be 
published to this topic.
+                                                         If the event header 
contains a "topic" field, the event will be published to that topic
+                                                         overriding the topic 
configured here.
+flumeBatchSize                      100                  How many messages to 
process in one batch. Larger batches improve throughput while adding latency.
+kafka.producer.acks                 1                    How many replicas 
must acknowledge a message before its considered successfully written.
+                                                         Accepted values are 0 
(Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all 
replicas)
+                                                         Set this to -1 to 
avoid data loss in some cases of leader failure.
+useFlumeEventFormat                 false                By default events are 
put as bytes onto the Kafka topic directly from the event body. Set to
+                                                         true to store events 
as the Flume Avro binary format. Used in conjunction with the same property
+                                                         on the KafkaSource or 
with the parseAsFlumeEvent property on the Kafka Channel this will preserve
+                                                         any Flume headers for 
the producing side.
+defaultPartitionId                  --                   Specifies a Kafka 
partition ID (integer) for all events in this channel to be sent to, unless
+                                                         overriden by 
``partitionIdHeader``. By default, if this property is not set, events will be
+                                                         distributed by the 
Kafka Producer's partitioner - including by ``key`` if specified (or by a
+                                                         partitioner specified 
by ``kafka.partitioner.class``).
+partitionIdHeader                   --                   When set, the sink 
will take the value of the field named using the value of this property
+                                                         from the event header 
and send the message to the specified partition of the topic. If the
+                                                         value represents an 
invalid partition, an EventDeliveryException will be thrown. If the header value
+                                                         is present then this 
setting overrides ``defaultPartitionId``.
+kafka.producer.security.protocol    PLAINTEXT            Set to 
SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of 
security. See below for additional info on secure setup.
+*more producer security props*                           If using 
SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security 
<http://kafka.apache.org/documentation.html#security>`_ for additional
+                                                         properties that need 
to be set on producer.
+Other Kafka Producer Properties     --                   These properties are 
used to configure the Kafka Producer. Any producer property supported
+                                                         by Kafka can be used. 
The only requirement is to prepend the property name with the prefix
+                                                         ``kafka.producer``.
+                                                         For example: 
kafka.producer.linger.ms
+==================================  ===================  
=============================================================================================
 
 .. note::   Kafka Sink uses the ``topic`` and ``key`` properties from the 
FlumeEvent headers to send events to Kafka.
             If ``topic`` exists in the headers, the event will be sent to that 
specific topic, overriding the topic configured for the Sink.
             If ``key`` exists in the headers, the key will used by Kafka to 
partition the data between the topic partitions. Events with same key
             will be sent to the same partition. If the key is null, events 
will be sent to random partitions.
 
+The Kafka sink also provides defaults for the 
key.serializer(org.apache.kafka.common.serialization.StringSerializer)
+and 
value.serializer(org.apache.kafka.common.serialization.ByteArraySerializer). 
Modification of these parameters is not recommended.
+
+Deprecated Properties
+
+===============================  ===================  
=============================================================================================
+Property Name                    Default              Description
+===============================  ===================  
=============================================================================================
+brokerList                       --                   Use 
kafka.bootstrap.servers
+topic                            default-flume-topic  Use kafka.topic
+batchSize                        100                  Use kafka.flumeBatchSize
+requiredAcks                     1                    Use kafka.producer.acks
+
+===============================  ===================  
=============================================================================================
+
 An example configuration of a Kafka sink is given below. Properties starting
-with the prefix ``kafka`` (the last 3 properties) are used when instantiating
-the Kafka producer. The properties that are passed when creating the Kafka
+with the prefix ``kafka.producer`` the Kafka producer. The properties that are 
passed when creating the Kafka
 producer are not limited to the properties given in this example.
-Also it's possible include your custom properties here and access them inside
+Also it is possible to include your custom properties here and access them 
inside
 the preprocessor through the Flume Context object passed in as a method
 argument.
 
 .. code-block:: properties
 
-    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
-    a1.sinks.k1.topic = mytopic
-    a1.sinks.k1.brokerList = localhost:9092
-    a1.sinks.k1.requiredAcks = 1
-    a1.sinks.k1.batchSize = 20
     a1.sinks.k1.channel = c1
+    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
+    a1.sinks.k1.kafka.topic = mytopic
+    a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
+    a1.sinks.k1.kafka.flumeBatchSize = 20
+    a1.sinks.k1.kafka.producer.acks = 1
+    a1.sinks.k1.kafka.producer.linger.ms = 1
+    a1.sinks.ki.kafka.producer.compression.type = snappy
+
+
+**Security and Kafka Sink:**
+
+Secure authentication as well as data encryption is supported on the 
communication channel between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the 
parameter is named SSL, the actual protocol is a TLS implementation) can be 
used from Kafka version 0.9.0.
+
+As of now data encryption is solely provided by SSL/TLS.
+
+Setting ``kafka.producer.security.protocol`` to any of the following value 
means:
+
+- **SASL_PLAINTEXT** - Kerberos or plaintext authentication with no data 
encryption
+- **SASL_SSL** - Kerberos or plaintext authentication with data encryption
+- **SSL** - TLS based encryption with optional authentication.
+
+.. warning::
+    There is a performance degradation when SSL is enabled,
+    the magnitude of which depends on the CPU type and the JVM implementation.
+    Reference: `Kafka security overview 
<http://kafka.apache.org/documentation#security_overview>`_
+    and the jira for tracking this issue:
+    `KAFKA-2561 <https://issues.apache.org/jira/browse/KAFKA-2561>`__
+
+
+**TLS and Kafka Sink:**
+
+Please read the steps described in `Configuring Kafka Clients SSL 
<http://kafka.apache.org/documentation#security_configclients>`_
+to learn about additional configuration settings for fine tuning for example 
any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore 
types.
+
+Example configuration with server side authentication and data encryption.
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.producer.security.protocol = SSL
+    a1.channels.channel1.kafka.producer.ssl.truststore.location = 
/path/to/truststore.jks
+    a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to 
access the truststore>
+
+
+Note: By default the property ``ssl.endpoint.identification.algorithm``
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm 
= HTTPS
+
+Once enabled, clients will verify the server's fully qualified domain name 
(FQDN)
+against one of the following two fields:
+
+#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
+#) Subject Alternative Name (SAN) 
https://tools.ietf.org/html/rfc5280#section-4.2.1.6
+
+If client side authentication is also required then additionally the following 
should be added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by 
Kafka brokers either
+individually or by their signature chain. Common example is to sign each 
client certificate by a single Root CA
+which in turn is trusted by Kafka brokers.
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.keystore.location = 
/path/to/client.keystore.jks
+    a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to 
access the keystore>
+
+If keystore and key use different password protection then 
``ssl.key.password`` property will
+provide the required additional secret for producer keystore:
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.key.password = <password to access 
the key>
+
+
+**Kerberos and Kafka Sink:**
+
+To use Kafka sink with a Kafka cluster secured with Kerberos, set the 
``producer.security.protocol`` property noted above for producer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified 
in a JAAS file's "KafkaClient" section. "Client" section describes the 
Zookeeper connection if needed.
+See `Kafka doc 
<http://kafka.apache.org/documentation.html#security_sasl_clientconfig>`_
+for information on the JAAS file contents. The location of this JAAS file and 
optionally the system wide kerberos configuration can be specified via 
JAVA_OPTS in flume-env.sh:
+
+.. code-block:: properties
+
+    JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
+    JAVA_OPTS="$JAVA_OPTS 
-Djava.security.auth.login.config=/path/to/flume_jaas.conf"
+
+Example secure configuration using SASL_PLAINTEXT:
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT
+    a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
+
+
+Example secure configuration using SASL_SSL:
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL
+    a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
+    a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
+    a1.channels.channel1.kafka.producer.ssl.truststore.location = 
/path/to/truststore.jks
+    a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to 
access the truststore>
+
+
+Sample JAAS file. For reference of its content please see client config 
sections of the desired authentication mechanism (GSSAPI/PLAIN)
+in Kafka documentation of `SASL configuration 
<http://kafka.apache.org/documentation#security_sasl_clientconfig>`_.
+Unlike the Kafka Source or Kafka Channel a "Client" section is not required, 
unless it is needed by other connecting components. Also please make sure
+that the operating system user of the Flume processes has read privileges on 
the jaas and keytab files.
+
+.. code-block:: javascript
+
+    KafkaClient {
+      com.sun.security.auth.module.Krb5LoginModule required
+      useKeyTab=true
+      storeKey=true
+      keyTab="/path/to/keytabs/flume.keytab"
+      principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
+    };
+
 
 Custom Sink
 ~~~~~~~~~~~
@@ -2534,7 +2972,7 @@ Example for agent named a1:
   a1.channels.c1.transactionCapacity = 10000
   a1.channels.c1.byteCapacityBufferPercentage = 20
   a1.channels.c1.byteCapacity = 800000
-  
+
 
 JDBC Channel
 ~~~~~~~~~~~~
@@ -2579,40 +3017,82 @@ The events are stored in a Kafka cluster
 replication, so in case an agent or a kafka broker crashes, the events are 
immediately available to other sinks
 
 The Kafka channel can be used for multiple scenarios:
-* With Flume source and sink - it provides a reliable and highly available 
channel for events
-* With Flume source and interceptor but no sink - it allows writing Flume 
events into a Kafka topic, for use by other apps
-* With Flume sink, but no source - it is a low-latency, fault tolerant way to 
send events from Kafka to Flume sources such as HDFS, HBase or Solr
+
+#. With Flume source and sink - it provides a reliable and highly available 
channel for events
+#. With Flume source and interceptor but no sink - it allows writing Flume 
events into a Kafka topic, for use by other apps
+#. With Flume sink, but no source - it is a low-latency, fault tolerant way to 
send events from Kafka to Flume sinks such as HDFS, HBase or Solr
+
+
+This version of Flume requires Kafka version 0.9 or greater due to the 
reliance on the Kafka clients shipped with that version. The configuration of
+the channel has changed compared to previous flume versions.
+
+The configuration parameters are organized as such:
+
+#. Configuration values related to the channel generically are applied at the 
channel config level, eg: a1.channel.k1.type =
+#. Configuration values related to Kafka or how the Channel operates are 
prefixed with "kafka.", (this are analgous to CommonClient Configs) eg: 
a1.channels.k1.kafka.topic and a1.channels.k1.kafka.bootstrap.servers. This is 
not dissimilar to how the hdfs sink operates
+#. Properties specific to the producer/consumer are prefixed by kafka.producer 
or kafka.consumer
+#. Where possible, the Kafka paramter names are used, eg: bootstrap.servers 
and acks
+
+This version of flume is backwards-compatible with previous versions, however 
deprecated properties are indicated in the table below and a warning message
+is logged on startup when they are present in the configuration file.
 
 Required properties are in **bold**.
 
-======================  ==========================  
===============================================================================================================
-Property Name           Default                           Description
-======================  ==========================  
===============================================================================================================
-**type**                --                          The component type name, 
needs to be ``org.apache.flume.channel.kafka.KafkaChannel``
-**brokerList**          --                          List of brokers in the 
Kafka cluster used by the channel
-                                                    This can be a partial list 
of brokers, but we recommend at least two for HA.
-                                                    The format is comma 
separated list of hostname:port
-**zookeeperConnect**    --                          URI of ZooKeeper used by 
Kafka cluster
-                                                    The format is comma 
separated list of hostname:port. If chroot is used, it is added once at the end.
-                                                    For example: 
zookeeper-1:2181,zookeeper-2:2182,zookeeper-3:2181/kafka
-topic                   flume-channel               Kafka topic which the 
channel will use
-groupId                 flume                       Consumer group ID the 
channel uses to register with Kafka.
-                                                    Multiple channels must use 
the same topic and group to ensure that when one agent fails another can get 
the data
-                                                    Note that having 
non-channel consumers with the same ID can lead to data loss.
-parseAsFlumeEvent       true                        Expecting Avro datums with 
FlumeEvent schema in the channel.
-                                                    This should be true if 
Flume source is writing to the channel
-                                                    And false if other 
producers are writing into the topic that the channel is using
-                                                    Flume source messages to 
Kafka can be parsed outside of Flume by using
-                                                    
org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk 
artifact
-readSmallestOffset      false                       When set to true, the 
channel will read all data in the topic, starting from the oldest event
-                                                    when false, it will read 
only events written after the channel started
-                                                    When "parseAsFlumeEvent" 
is true, this will be false. Flume source will start prior to the sinks and this
-                                                    guarantees that events 
sent by source before sinks start will not be lost.
-Other Kafka Properties  --                          These properties are used 
to configure the Kafka Producer and Consumer used by the channel.
-                                                    Any property supported by 
Kafka can be used.
-                                                    The only requirement is to 
prepend the property name with the prefix ``kafka.``.
-                                                    For example: 
kafka.producer.type
-======================  ==========================  
===============================================================================================================
+=======================================  ==========================  
===============================================================================================================
+Property Name                            Default                     
Description
+=======================================  ==========================  
===============================================================================================================
+**type**                                 --                          The 
component type name, needs to be ``org.apache.flume.channel.kafka.KafkaChannel``
+**kafka.bootstrap.servers**              --                          List of 
brokers in the Kafka cluster used by the channel
+                                                                     This can 
be a partial list of brokers, but we recommend at least two for HA.
+                                                                     The 
format is comma separated list of hostname:port
+kafka.topic                              flume-channel               Kafka 
topic which the channel will use
+kafka.consumer.group.id                  flume                       Consumer 
group ID the channel uses to register with Kafka.
+                                                                     Multiple 
channels must use the same topic and group to ensure that when one agent fails 
another can get the data
+                                                                     Note that 
having non-channel consumers with the same ID can lead to data loss.
+
+parseAsFlumeEvent                        true                        Expecting 
Avro datums with FlumeEvent schema in the channel.
+                                                                     This 
should be true if Flume source is writing to the channel and false if other 
producers are
+                                                                     writing 
into the topic that the channel is using. Flume source messages to Kafka can be 
parsed outside of Flume by using
+                                                                     
org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk 
artifact
+migrateZookeeperOffsets                  true                        When no 
Kafka stored offset is found, look up the offsets in Zookeeper and commit them 
to Kafka.
+                                                                     This 
should be true to support seamless Kafka client migration from older versions 
of Flume. Once migrated this can be set
+                                                                     to false, 
though that should generally not be required. If no Zookeeper offset is found 
the kafka.consumer.auto.offset.reset
+                                                                     
configuration defines how offsets are handled.
+pollTimeout                              500                         The 
amount of time(in milliseconds) to wait in the "poll()" call of the consumer.
+                                                                     
https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
+defaultPartitionId                       --                          Specifies 
a Kafka partition ID (integer) for all events in this channel to be sent to, 
unless
+                                                                     overriden 
by ``partitionIdHeader``. By default, if this property is not set, events will 
be
+                                                                     
distributed by the Kafka Producer's partitioner - including by ``key`` if 
specified (or by a
+                                                                     
partitioner specified by ``kafka.partitioner.class``).
+partitionIdHeader                        --                          When set, 
the producer will take the value of the field named using the value of this 
property
+                                                                     from the 
event header and send the message to the specified partition of the topic. If 
the
+                                                                     value 
represents an invalid partition the event will not be accepted into the 
channel. If the header value
+                                                                     is 
present then this setting overrides ``defaultPartitionId``.
+kafka.consumer.auto.offset.reset         latest                      What to 
do when there is no initial offset in Kafka or if the current offset does not 
exist any more on the server
+                                                                     (e.g. 
because that data has been deleted):
+                                                                     earliest: 
automatically reset the offset to the earliest offset
+                                                                     latest: 
automatically reset the offset to the latest offset
+                                                                     none: 
throw exception to the consumer if no previous offset is found for the 
consumer\'s group
+                                                                     anything 
else: throw exception to the consumer.
+kafka.producer.security.protocol         PLAINTEXT                   Set to 
SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of 
security. See below for additional info on secure setup.
+kafka.consumer.security.protocol         PLAINTEXT                   Same as 
kafka.producer.security.protocol but for reading/consuming from Kafka.
+*more producer/consumer security props*                              If using 
SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security 
<http://kafka.apache.org/documentation.html#security>`_ for additional
+                                                                     
properties that need to be set on producer/consumer.
+=======================================  ==========================  
===============================================================================================================
+
+Deprecated Properties
+
+================================  ==========================  
===============================================================================================================
+Property Name                     Default                     Description
+================================  ==========================  
===============================================================================================================
+brokerList                        --                          List of brokers 
in the Kafka cluster used by the channel
+                                                              This can be a 
partial list of brokers, but we recommend at least two for HA.
+                                                              The format is 
comma separated list of hostname:port
+topic                             flume-channel               Use kafka.topic
+groupId                           flume                       Use 
kafka.consumer.group.id
+readSmallestOffset                false                       Use 
kafka.consumer.auto.offset.reset
+
+================================  ==========================  
===============================================================================================================
 
 .. note:: Due to the way the channel is load balanced, there may be duplicate 
events when the agent first starts up
 
@@ -2620,12 +3100,162 @@ Example for agent named a1:
 
 .. code-block:: properties
 
-    a1.channels.channel1.type   = org.apache.flume.channel.kafka.KafkaChannel
-    a1.channels.channel1.capacity = 10000
-    a1.channels.channel1.transactionCapacity = 1000
-    a1.channels.channel1.brokerList=kafka-2:9092,kafka-3:9092
-    a1.channels.channel1.topic=channel1
-    a1.channels.channel1.zookeeperConnect=kafka-1:2181
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9092,kafka-2:9092,kafka-3:9092
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
+
+
+**Security and Kafka Channel:**
+
+Secure authentication as well as data encryption is supported on the 
communication channel between Flume and Kafka.
+For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the 
parameter is named SSL, the actual protocol is a TLS implementation) can be 
used from Kafka version 0.9.0.
+
+As of now data encryption is solely provided by SSL/TLS.
+
+Setting ``kafka.producer|consumer.security.protocol`` to any of the following 
value means:
+
+- **SASL_PLAINTEXT** - Kerberos or plaintext authentication with no data 
encryption
+- **SASL_SSL** - Kerberos or plaintext authentication with data encryption
+- **SSL** - TLS based encryption with optional authentication.
+
+.. warning::
+    There is a performance degradation when SSL is enabled,
+    the magnitude of which depends on the CPU type and the JVM implementation.
+    Reference: `Kafka security overview 
<http://kafka.apache.org/documentation#security_overview>`_
+    and the jira for tracking this issue:
+    `KAFKA-2561 <https://issues.apache.org/jira/browse/KAFKA-2561>`_
+
+
+**TLS and Kafka Channel:**
+
+Please read the steps described in `Configuring Kafka Clients SSL 
<http://kafka.apache.org/documentation#security_configclients>`_
+to learn about additional configuration settings for fine tuning for example 
any of the following:
+security provider, cipher suites, enabled protocols, truststore or keystore 
types.
+
+Example configuration with server side authentication and data encryption.
+
+.. code-block:: properties
+
+    a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
+    a1.channels.channel1.kafka.bootstrap.servers = 
kafka-1:9093,kafka-2:9093,kafka-3:9093
+    a1.channels.channel1.kafka.topic = channel1
+    a1.channels.channel1.kafka.consumer.group.id = flume-consumer
+    a1.channels.channel1.kafka.producer.security.protocol = SSL
+    a1.channels.channel1.kafka.producer.ssl.truststore.location = 
/path/to/truststore.jks
+    a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to 
access the truststore>
+    a1.channels.channel1.kafka.consumer.security.protocol = SSL
+    a1.channels.channel1.kafka.consumer.ssl.truststore.location = 
/path/to/truststore.jks
+    a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to 
access the truststore>
+
+
+Note: By default the property ``ssl.endpoint.identification.algorithm``
+is not defined, so hostname verification is not performed.
+In order to enable hostname verification, set the following properties
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm 
= HTTPS
+    a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm 
= HTTPS
+
+Once enabled, clients will verify the server's fully qualified domain name 
(FQDN)
+against one of the following two fields:
+
+#) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
+#) Subject Alternative Name (SAN) 
https://tools.ietf.org/html/rfc5280#section-4.2.1.6
+
+If client side authentication is also required then additionally the following 
should be added to Flume agent configuration.
+Each Flume agent has to have its client certificate which has to be trusted by 
Kafka brokers either
+individually or by their signature chain. Common example is to sign each 
client certificate by a single Root CA
+which in turn is trusted by Kafka brokers.
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.keystore.location = 
/path/to/client.keystore.jks
+    a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to 
access the keystore>
+    a1.channels.channel1.kafka.consumer.ssl.keystore.location = 
/path/to/client.keystore.jks
+    a1.channels.channel1.kafka.consumer.ssl.keystore.password = <password to 
access the keystore>
+
+If keystore and key use different password protection then 
``ssl.key.password`` property will
+provide the required additional secret for both consumer and producer 
keystores:
+
+.. code-block:: properties
+
+    a1.channels.channel1.kafka.producer.ssl.key.password = <password to access 
the key>
+    a1.channels.channel1.kafka.consumer.ssl.key.password = <password to access 
the key>
+
+
+**Kerberos and Kafka Channel:**
+
+To use Kafka channel with a Kafka cluster secured with Kerberos, set the 
``producer/consumer.security.protocol`` properties noted above for producer 
and/or consumer.
+The Kerberos keytab and principal to be used with Kafka brokers is specified 
in a JAAS file's "KafkaClient" section. "Client" section describes the 
Zookeeper connection if needed.
+See `Kafka doc 
<http://kafka.apache.org/documentation.html#security_sasl_clientconfig>`_

[... 185 lines stripped ...]


Reply via email to